// // Copyright 2019 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only // import Foundation public import LibSignalClient public import SignalRingRTC import SwiftProtobuf public protocol StorageServiceManager { typealias ManifestRotationMode = StorageServiceManagerManifestRotationMode /// Updates the local user's identity. /// /// Called during app launch, registration, and change number. func setLocalIdentifiers(_ localIdentifiers: LocalIdentifiers) /// Sets up Cron jobs. func registerForCron(_ cron: Cron) /// The version of the latest known Storage Service manifest. func currentManifestVersion(tx: DBReadTransaction) -> UInt64 /// Whether the latest-known Storage Service manifest contains a `recordIkm`. func currentManifestHasRecordIkm(tx: DBReadTransaction) -> Bool func recordPendingUpdates(updatedRecipientUniqueIds: [RecipientUniqueId]) func recordPendingUpdates(updatedAddresses: [SignalServiceAddress]) func recordPendingUpdates(updatedGroupV2MasterKeys: [GroupMasterKey]) func recordPendingUpdates(updatedStoryDistributionListIds: [Data]) func recordPendingUpdates(callLinkRootKeys: [CallLinkRootKey]) func recordPendingLocalAccountUpdates() func backupPendingChanges(authedDevice: AuthedDevice) @discardableResult func restoreOrCreateManifestIfNecessary(authedDevice: AuthedDevice, masterKeySource: StorageService.MasterKeySource) -> Promise func rotateManifest( mode: ManifestRotationMode, authedDevice: AuthedDevice, ) async throws /// Wipes all local state related to Storage Service, without mutating /// remote state. /// /// - Note /// The expected behavior after calling this method is that the next time we /// perform a backup we will create a brand-new manifest with version 1, as /// we have no local manifest version. However, since we still (probably) /// have a remote manifest this backup will be rejected, and we'll merge in /// the remote manifest, then re-attempt our backup. /// /// This is a weird behavior to specifically want, and new callers who are /// interested in forcing a manifest recreation should probably prefer /// ``rotateManifest`` instead. func resetLocalData(transaction: DBWriteTransaction) /// Waits for pending restores to finish. /// /// When this is resolved, it means the current device has the latest state /// available on storage service. /// /// If this device believes there's new state available on storage service /// but the request to fetch it has failed, this Promise will be rejected. /// /// If the local device doesn't believe storage service has new state, this /// will resolve without performing any network requests. /// /// Due to the asynchronous nature of network requests, it's possible for /// another device to write to storage service at the same time the returned /// Promise resolves. Therefore, the precise behavior of this method is best /// described as: "if this device has knowledge that storage service has new /// state at the time this method is invoked, the returned Promise will be /// resolved after that state has been fetched". func waitForPendingRestores() async throws /// Waits for pending operations to finish. func waitForSteadyState() async throws(CancellationError) } extension StorageServiceManager { public func recordPendingUpdates(groupModel: TSGroupModel) { if let groupModelV2 = groupModel as? TSGroupModelV2 { let masterKey: GroupMasterKey do { masterKey = try groupModelV2.masterKey() } catch { owsFailDebug("Missing master key: \(error)") return } recordPendingUpdates(updatedGroupV2MasterKeys: [masterKey]) } else { owsFailDebug("How did we end up with pending updates to a V1 group?") } } } public enum StorageServiceManagerManifestRotationMode { /// Recreate the manifest, preserving its contained data related to records. /// Since the record data is preserved, such as their identifiers and the /// `recordIkm`, the manifest can be inexpensively recreated in place /// leaving records untouched. /// /// - Note /// This mode is only applicable if we have previously migrated to using a /// `recordIkm`. If not, this mode is treated like `.alsoRotatingRecords`. case preservingRecordsIfPossible /// Recreate the manifest and all records, using local data as the source of /// truth for creating records. This deletes all existing records, replacing /// them with new ones with newly-generated identifiers; these new records /// will be encrypted using a newly-generated `recordIkm`. case alsoRotatingRecords /// Orders cases by precedence, with higher numbers more significant. private var precedenceOrder: Int { switch self { case .preservingRecordsIfPossible: return 0 case .alsoRotatingRecords: return 1 } } /// Merge the given mode into this one, returning the one with the higher /// precedence. fileprivate func mergeByPrecedence(_ other: Self) -> Self { if precedenceOrder >= other.precedenceOrder { return self } return other } } // MARK: - public class StorageServiceManagerImpl: NSObject, StorageServiceManager { private let appReadiness: AppReadiness init(appReadiness: AppReadiness) { self.appReadiness = appReadiness super.init() SwiftSingletons.register(self) if CurrentAppContext().isMainApp { appReadiness.runNowOrWhenAppWillBecomeReady { self.cleanUpUnknownData() } appReadiness.runNowOrWhenAppDidBecomeReadySync { [self] in NotificationCenter.default.addObserver( self, selector: #selector(self.willResignActive), name: .OWSApplicationWillResignActive, object: nil, ) NotificationCenter.default.addObserver( self, selector: #selector(self.didBecomeActive), name: .OWSApplicationDidBecomeActive, object: nil, ) NotificationCenter.default.addObserver( self, selector: #selector(self.backupPlanDidChange), name: .backupPlanChanged, object: nil, ) // On first launch, back up any pending changes from previous // launches. For the remainder of this launch we're covered by // the willResignActive and didBecomeActive listeners. backupPendingChanges(authedDevice: .implicit) Task { await self.cleanUpDeletedCallLinks() } } } } private static let restoreManifestCronKey: Cron.UniqueKey = .fetchStorageService private static let restoreManifestCronInterval: TimeInterval = .day public func registerForCron(_ cron: Cron) { cron.schedulePeriodically( uniqueKey: Self.restoreManifestCronKey, approximateInterval: Self.restoreManifestCronInterval, mustBeRegistered: true, mustBeConnected: true, operation: { try await self._restoreOrCreateManifestIfNecessary( authedDevice: .implicit, masterKeySource: .implicit, isRunningViaCron: true, ).awaitableWithUncooperativeCancellationHandling() }, ) } fileprivate static func updateRestoreManifestCronDate(tx: DBWriteTransaction) { CronStore(uniqueKey: restoreManifestCronKey) .setMostRecentDate(Date(), jitter: restoreManifestCronInterval / Cron.jitterFactor, tx: tx) } @objc private func willResignActive() { // If we have any pending changes, start a back up immediately // to try and make sure the service doesn't get stale. If for // some reason we aren't able to successfully complete this backup // while in the background we'll try again on the next app launch. backupPendingChanges(authedDevice: .implicit) } @objc private func didBecomeActive() { // We may have pending changes from before we resigned active that we // should back up as soon as we can, rather than waiting for a full app // launch. backupPendingChanges(authedDevice: .implicit) } @objc private func backupPlanDidChange() { // If the BackupPlan changed, we should update our AccountRecord. recordPendingLocalAccountUpdates() } public func setLocalIdentifiers(_ localIdentifiers: LocalIdentifiers) { updateManagerState { managerState in managerState.localIdentifiers = localIdentifiers } } // MARK: - public func currentManifestVersion(tx: DBReadTransaction) -> UInt64 { return StorageServiceOperation.State.current( transaction: tx, ).manifestVersion } public func currentManifestHasRecordIkm(tx: DBReadTransaction) -> Bool { return StorageServiceOperation.State.current( transaction: tx, ).manifestRecordIkm != nil } // MARK: - private struct ManagerState { /// The local user's identifiers. In the future, this should be provided /// when this class is initialized. For now, it's an Optional to handle the /// window between initialization and when the database is loaded. var localIdentifiers: LocalIdentifiers? struct PendingManifestRotation { var authedDevice: AuthedDevice var masterKeySource: StorageService.MasterKeySource var continuations: [CheckedContinuation] var mode: ManifestRotationMode } var pendingManifestRotation: PendingManifestRotation? var hasPendingCleanup = false struct PendingBackup { // Ideally, we instead have the entire StorageServiceManager class be // instantiated with the necessary context to make authenticated requests. // This is a middle ground between the current world (implicit auth we grab // from tsAccountManager) and explicit auth management. var authedDevice: AuthedDevice var masterKeySource: StorageService.MasterKeySource } var pendingBackup: PendingBackup? var pendingBackupTimer: Timer? struct PendingRestore { var authedDevice: AuthedDevice var masterKeySource: StorageService.MasterKeySource var isRunningViaCron: Bool var futures: [Future] } var pendingRestore: PendingRestore? var pendingMutations = PendingMutations() /// If set, contains the Error from the most recent restore request. If /// it's nil, we've either (a) not yet attempted a restore in this /// process; or (b) completed the most recent restore successfully. var mostRecentRestoreError: Error? var pendingRestoreCompletionFutures = [Future]() var isRunningOperation = false var onSteadyState = [NSObject: Monitor.Continuation]() } private let managerState = AtomicValue(ManagerState(), lock: .init()) private func updateManagerState(block: (inout ManagerState) -> Void) { Monitor.updateAndNotify( in: managerState, block: { block(&$0) startNextOperationIfNeeded(&$0) }, conditions: steadyStateCondition, ) } private func startNextOperationIfNeeded(_ managerState: inout ManagerState) { guard !managerState.isRunningOperation else { // Already running an operation -- we'll start the next when it finishes. return } guard let (nextOperation, cleanupBlock) = popNextOperation(&managerState) else { // There's nothing we need to do, so don't start any operation. return } // Run the operation & check again when it's done. managerState.isRunningOperation = true Task { let result = await Result { try await nextOperation() } self.finishOperation(cleanupBlock: { cleanupBlock?(&$0, { switch result { case .success(()): nil case .failure(let error): error } }()) }) } } private func popNextOperation(_ managerState: inout ManagerState) -> (() async throws -> Void, ((inout ManagerState, (any Error)?) -> Void)?)? { if let pendingManifestRotation = managerState.pendingManifestRotation { managerState.pendingManifestRotation = nil func resumeContinuations(_ error: Error?) { for continuation in pendingManifestRotation.continuations { if let error { continuation.resume(throwing: error) } else { continuation.resume() } } } if let rotateManifestOperation = buildOperation( managerState: managerState, mode: .rotateManifest(mode: pendingManifestRotation.mode), authedDevice: pendingManifestRotation.authedDevice, masterKeySource: pendingManifestRotation.masterKeySource, ) { let cleanupBlock: ((inout ManagerState, (any Error)?) -> Void) = { _, error in resumeContinuations(error) } return (rotateManifestOperation, cleanupBlock) } else { /// Resume the continuations, but don't return `nil` since there /// may be other operations we can pop instead. resumeContinuations(OWSAssertionError("Failed to build rotate manifest operation!")) } } if managerState.pendingMutations.hasChanges { let pendingMutations = managerState.pendingMutations managerState.pendingMutations = PendingMutations() return (StorageServiceOperation.recordPendingMutations(pendingMutations), nil) } if managerState.hasPendingCleanup { managerState.hasPendingCleanup = false let cleanUpOperation = buildOperation( managerState: managerState, mode: .cleanUpUnknownData, authedDevice: .implicit, masterKeySource: .implicit, ) if let cleanUpOperation { return (cleanUpOperation, nil) } } if let pendingRestore = managerState.pendingRestore { managerState.pendingRestore = nil managerState.mostRecentRestoreError = nil let restoreOperation = buildOperation( managerState: managerState, mode: .restoreOrCreate(isRunningViaCron: pendingRestore.isRunningViaCron), authedDevice: pendingRestore.authedDevice, masterKeySource: pendingRestore.masterKeySource, ) if let restoreOperation { return ({ do { try await restoreOperation() pendingRestore.futures.forEach { $0.resolve() } } catch { pendingRestore.futures.forEach { $0.reject(error) } throw error } }, { $0.mostRecentRestoreError = $1 }) } } if !managerState.pendingRestoreCompletionFutures.isEmpty { let pendingRestoreCompletionFutures = managerState.pendingRestoreCompletionFutures managerState.pendingRestoreCompletionFutures = [] let mostRecentRestoreError = managerState.mostRecentRestoreError return ({ pendingRestoreCompletionFutures.forEach { if let mostRecentRestoreError { $0.reject(mostRecentRestoreError) } else { $0.resolve(()) } } }, nil) } if let pendingBackup = managerState.pendingBackup { managerState.pendingBackup = nil let backupOperation = buildOperation( managerState: managerState, mode: .backup, authedDevice: pendingBackup.authedDevice, masterKeySource: pendingBackup.masterKeySource, ) if let backupOperation { return (backupOperation, nil) } } return nil } private func buildOperation( managerState: ManagerState, mode: StorageServiceOperation.Mode, authedDevice: AuthedDevice, masterKeySource: StorageService.MasterKeySource, ) -> (() async throws -> Void)? { let localIdentifiers: LocalIdentifiers let isPrimaryDevice: Bool switch authedDevice { case .explicit(let explicit): localIdentifiers = explicit.localIdentifiers isPrimaryDevice = explicit.isPrimaryDevice case .implicit: // Under the new reg flow, we will sync kbs keys before being fully ready with // ts account manager auth set up. skip if so. let tsAccountManager = DependenciesBridge.shared.tsAccountManager guard let registeredState = try? tsAccountManager.registeredStateWithMaybeSneakyTransaction() else { Logger.info("Skipping storage service operation with implicit auth during registration.") return nil } // The `isRegisteredAndReady` property only returns true when // `LocalIdentifiers` are ready on `TSAccountManager`. These should have // been provided to this object before we reach this point. guard let implicitLocalIdentifiers = managerState.localIdentifiers else { owsFailDebug("Trying to perform storage service operation without any identifiers.") return nil } localIdentifiers = implicitLocalIdentifiers isPrimaryDevice = registeredState.isPrimary } return { try await StorageServiceOperation( mode: mode, localIdentifiers: localIdentifiers, isPrimaryDevice: isPrimaryDevice, authedDevice: authedDevice, masterKeySource: masterKeySource, ).run() } } private func finishOperation(cleanupBlock: (inout ManagerState) -> Void) { updateManagerState { managerState in cleanupBlock(&managerState) managerState.isRunningOperation = false } } // MARK: - Pending Mutations private func updatePendingMutations(block: (inout PendingMutations) -> Void) { updateManagerState { managerState in block(&managerState.pendingMutations) // If we've made any changes, schedule a backup for the near future. This // provides an interval during which pending mutations can be coalesced. if managerState.pendingMutations.hasChanges, managerState.pendingBackupTimer == nil { managerState.pendingBackupTimer = startBackupTimer() } } } public func recordPendingUpdates(updatedRecipientUniqueIds: [RecipientUniqueId]) { if updatedRecipientUniqueIds.isEmpty { return } Logger.info("Recording pending update for recipientUniqueIds: \(updatedRecipientUniqueIds)") updatePendingMutations { $0.updatedRecipientUniqueIds.formUnion(updatedRecipientUniqueIds) } } public func recordPendingUpdates(updatedAddresses: [SignalServiceAddress]) { if updatedAddresses.isEmpty { return } Logger.info("Recording pending update for addresses: \(updatedAddresses)") updatePendingMutations { $0.updatedServiceIds.formUnion(updatedAddresses.lazy.compactMap({ $0.serviceId })) } } public func recordPendingUpdates(updatedGroupV2MasterKeys: [GroupMasterKey]) { updatePendingMutations { $0.updatedGroupV2MasterKeys.formUnion(updatedGroupV2MasterKeys.map { $0.serialize() }) } } @objc public func recordPendingUpdates(updatedStoryDistributionListIds: [Data]) { updatePendingMutations { $0.updatedStoryDistributionListIds.formUnion(updatedStoryDistributionListIds) } } public func recordPendingUpdates(callLinkRootKeys: [CallLinkRootKey]) { updatePendingMutations { $0.updatedCallLinkRootKeys.formUnion(callLinkRootKeys.lazy.map(\.bytes)) } } public func recordPendingLocalAccountUpdates() { Logger.info("Recording pending local account updates") updatePendingMutations { $0.updatedLocalAccount = true } } // MARK: - Actions @discardableResult public func restoreOrCreateManifestIfNecessary( authedDevice: AuthedDevice, masterKeySource: StorageService.MasterKeySource, ) -> Promise { return _restoreOrCreateManifestIfNecessary( authedDevice: authedDevice, masterKeySource: masterKeySource, isRunningViaCron: false, ) } private func _restoreOrCreateManifestIfNecessary( authedDevice: AuthedDevice, masterKeySource: StorageService.MasterKeySource, isRunningViaCron: Bool, ) -> Promise { let (promise, future) = Promise.pending() updateManagerState { managerState in var pendingRestore = managerState.pendingRestore ?? .init( authedDevice: .implicit, masterKeySource: .implicit, isRunningViaCron: false, futures: [], ) pendingRestore.futures.append(future) pendingRestore.authedDevice = authedDevice.orIfImplicitUse(pendingRestore.authedDevice) pendingRestore.masterKeySource = masterKeySource.orIfImplicitUse(pendingRestore.masterKeySource) pendingRestore.isRunningViaCron = isRunningViaCron || pendingRestore.isRunningViaCron managerState.pendingRestore = pendingRestore } return promise } public func rotateManifest( mode: ManifestRotationMode, authedDevice: AuthedDevice, ) async throws { try await withCheckedThrowingContinuation { continuation in updateManagerState { managerState in var pendingRotation = managerState.pendingManifestRotation ?? .init( authedDevice: .implicit, masterKeySource: .implicit, continuations: [], mode: mode, ) pendingRotation.continuations.append(continuation) pendingRotation.authedDevice = authedDevice.orIfImplicitUse(pendingRotation.authedDevice) pendingRotation.mode = pendingRotation.mode.mergeByPrecedence(mode) managerState.pendingManifestRotation = pendingRotation } } } public func backupPendingChanges(authedDevice: AuthedDevice) { updateManagerState { managerState in var pendingBackup = managerState.pendingBackup ?? .init(authedDevice: .implicit, masterKeySource: .implicit) pendingBackup.authedDevice = authedDevice.orIfImplicitUse(pendingBackup.authedDevice) managerState.pendingBackup = pendingBackup if let pendingBackupTimer = managerState.pendingBackupTimer { DispatchQueue.main.async { pendingBackupTimer.invalidate() } managerState.pendingBackupTimer = nil } } } public func waitForPendingRestores() async throws { let (promise, future) = Promise.pending() updateManagerState { managerState in managerState.pendingRestoreCompletionFutures.append(future) } try await promise.awaitableWithUncooperativeCancellationHandling() } private let steadyStateCondition = Monitor.Condition( isSatisfied: { !$0.isRunningOperation && $0.pendingBackupTimer == nil }, waiters: \.onSteadyState, ) public func waitForSteadyState() async throws(CancellationError) { try await Monitor.waitForCondition(steadyStateCondition, in: managerState) } public func resetLocalData(transaction: DBWriteTransaction) { Logger.info("Resetting local storage service data.") StorageServiceOperation.keyValueStore.removeAll(transaction: transaction) } private func cleanUpUnknownData() { updateManagerState { managerState in managerState.hasPendingCleanup = true } } // MARK: - Backup Scheduling private static var backupDebounceInterval: TimeInterval = 0.2 // Schedule a one-time backup. By default, this will happen `backupDebounceInterval` // seconds after the first pending change is recorded. private func startBackupTimer() -> Timer { let timer = Timer( timeInterval: StorageServiceManagerImpl.backupDebounceInterval, target: self, selector: #selector(self.backupTimerFired(_:)), userInfo: nil, repeats: false, ) DispatchQueue.main.async { RunLoop.current.add(timer, forMode: .default) } return timer } @objc private func backupTimerFired(_ timer: Timer) { AssertIsOnMainThread() backupPendingChanges(authedDevice: .implicit) } // MARK: - Cleanup private func cleanUpDeletedCallLinks() async { let callLinkStore = DependenciesBridge.shared.callLinkStore let db = DependenciesBridge.shared.db let deletionThresholdMs = Date.ows_millisecondTimestamp() - RemoteConfig.current.messageQueueTimeMs let callLinkRecords = db.read { tx in callLinkStore.fetchWhere(adminDeletedAtTimestampMsIsLessThan: deletionThresholdMs, tx: tx) } if !callLinkRecords.isEmpty { Logger.info("Cleaning up \(callLinkRecords.count) call links that were deleted a while ago.") await db.awaitableWrite { tx in for callLinkRecord in callLinkRecords { callLinkStore.deleteIfPossible(callLinkRecord, tx: tx) } } recordPendingUpdates(callLinkRootKeys: callLinkRecords.map(\.rootKey)) } } } // MARK: - PendingMutations private struct PendingMutations { var updatedRecipientUniqueIds = Set() var updatedServiceIds = Set() var updatedGroupV2MasterKeys = Set() var updatedStoryDistributionListIds = Set() var updatedCallLinkRootKeys = Set() var updatedLocalAccount = false var hasChanges: Bool { return updatedLocalAccount || !updatedRecipientUniqueIds.isEmpty || !updatedServiceIds.isEmpty || !updatedGroupV2MasterKeys.isEmpty || !updatedStoryDistributionListIds.isEmpty || !updatedCallLinkRootKeys.isEmpty } } // MARK: - class StorageServiceOperation { private static let migrationStore: KeyValueStore = KeyValueStore(collection: "StorageServiceMigration") private static let versionKey = "Version" fileprivate static var keyValueStore: KeyValueStore { return KeyValueStore(collection: "kOWSStorageServiceOperation_IdentifierMap") } // MARK: - fileprivate enum Mode { case rotateManifest(mode: StorageServiceManager.ManifestRotationMode) case backup case restoreOrCreate(isRunningViaCron: Bool) case cleanUpUnknownData } private let mode: Mode private let localIdentifiers: LocalIdentifiers private let isPrimaryDevice: Bool private let authedDevice: AuthedDevice private let masterKeySource: StorageService.MasterKeySource private var masterKey: MasterKey! private var authedAccount: AuthedAccount { authedDevice.authedAccount } fileprivate init( mode: Mode, localIdentifiers: LocalIdentifiers, isPrimaryDevice: Bool, authedDevice: AuthedDevice, masterKeySource: StorageService.MasterKeySource, ) { self.mode = mode self.localIdentifiers = localIdentifiers self.isPrimaryDevice = isPrimaryDevice self.authedDevice = authedDevice self.masterKeySource = masterKeySource } // MARK: - Run func run() async throws { return try await Retry.performWithBackoff( maxAttempts: 4, block: { try await _run() }, ) } private func _run() async throws { let accountKeyStore = DependenciesBridge.shared.accountKeyStore let databaseStorage = SSKEnvironment.shared.databaseStorageRef let tsAccountManager = DependenciesBridge.shared.tsAccountManager let (currentStateIfRotatingManifest, masterKey) = databaseStorage.read { tx in let state: State? switch mode { case .rotateManifest: state = State.current(transaction: tx) case .backup, .restoreOrCreate, .cleanUpUnknownData: state = nil } var masterKey: MasterKey? switch masterKeySource { case .explicit(let keyData): masterKey = keyData case .implicit: masterKey = accountKeyStore.getMasterKey(tx: tx) } if !isPrimaryDevice, accountKeyStore.isWaitingForKeysSyncMessage(tx: tx) { // We hit a failure and are waiting for a "new" AEP/MasterKey. masterKey = nil } return (state, masterKey) } guard let masterKey else { if !isPrimaryDevice, tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered { await sendKeysSyncRequestMessageIfNeeded() } else { // We're either not registered, or a primary. Either way, // we don't have keys, or a means to get them, so do nothing. // We'll try a fresh restore once the keys are set. Logger.info("Skipping storage service operation due to missing master key.") } return } self.masterKey = masterKey switch mode { case .rotateManifest(let mode): guard isPrimaryDevice else { throw OWSAssertionError("Can only rotate manifest from primary device!") } let nextManifestVersion = currentStateIfRotatingManifest!.manifestVersion + 1 switch mode { case .preservingRecordsIfPossible: try await createNewManifestPreservingRecords(version: nextManifestVersion) case .alsoRotatingRecords: try await createNewManifestAndRecords(version: nextManifestVersion) } case .backup: try await backupPendingChanges() case .restoreOrCreate(let isRunningViaCron): try await restoreOrCreateManifestIfNecessary() // If we weren't triggered via Cron, we can report the result to Cron to // avoid fetching when unnecessary. if !isRunningViaCron { await databaseStorage.awaitableWrite { tx in StorageServiceManagerImpl.updateRestoreManifestCronDate(tx: tx) } } case .cleanUpUnknownData: await cleanUpUnknownData() } } private func sendKeysSyncRequestMessageIfNeeded() async { owsPrecondition(!isPrimaryDevice) let accountKeyStore = DependenciesBridge.shared.accountKeyStore let databaseStorage = SSKEnvironment.shared.databaseStorageRef let syncManager = SSKEnvironment.shared.syncManagerRef await databaseStorage.awaitableWrite { tx in if accountKeyStore.isWaitingForKeysSyncMessage(tx: tx) { // We've already requested keys; if the request got lost, we can rely on // the periodic keys sync message. return } // This is a linked device, and keys are missing. There's nothing that can // be done until we receive new keys, so send a key sync message. syncManager.sendKeysSyncRequestMessage(transaction: tx) accountKeyStore.setWaitingForKeysSyncMessage(true, tx: tx) } } // MARK: - Mark Pending Changes fileprivate static func recordPendingMutations(_ pendingMutations: PendingMutations) -> (() async -> Void) { return { await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { recordPendingMutations(pendingMutations, transaction: $0) } } } private static func recordPendingMutations( _ pendingMutations: PendingMutations, transaction: DBWriteTransaction, ) { var state = State.current(transaction: transaction) recordPendingMutations(pendingMutations, in: &state, transaction: transaction) state.save(transaction: transaction) } private static func recordPendingMutations( _ pendingMutations: PendingMutations, in state: inout State, transaction tx: DBWriteTransaction, ) { // Coalesce addresses to account IDs. There may be duplicates among the // addresses and account IDs. var allRecipientUniqueIds = Set() allRecipientUniqueIds.formUnion(pendingMutations.updatedRecipientUniqueIds) let recipientFetcher = DependenciesBridge.shared.recipientFetcher allRecipientUniqueIds.formUnion(pendingMutations.updatedServiceIds.lazy.compactMap { (serviceId: ServiceId) -> String? in return recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: tx).uniqueId }) // Then, update State with all these pending mutations. Logger.info( """ Recording pending mutations (\ Account: \(pendingMutations.updatedLocalAccount); \ Contacts: \(allRecipientUniqueIds.count); \ GV2: \(pendingMutations.updatedGroupV2MasterKeys.count); \ DLists: \(pendingMutations.updatedStoryDistributionListIds.count); \ CLinks: \(pendingMutations.updatedCallLinkRootKeys.count)) """, ) if pendingMutations.updatedLocalAccount { state.localAccountChangeState = .updated } allRecipientUniqueIds.forEach { state.accountIdChangeMap[$0] = .updated } pendingMutations.updatedGroupV2MasterKeys.forEach { state.groupV2ChangeMap[$0] = .updated } pendingMutations.updatedStoryDistributionListIds.forEach { state.storyDistributionListChangeMap[$0] = .updated } pendingMutations.updatedCallLinkRootKeys.forEach { state.callLinkRootKeyChangeMap[$0] = .updated } } private func normalizePendingMutations(in state: inout State, transaction: DBReadTransaction) { // If we didn't change any AccountIds, then we definitely don't have a // match for the `if` check which follows & can avoid the query. if state.accountIdChangeMap.isEmpty { return } let localAci = localIdentifiers.aci let recipientIdFinder = DependenciesBridge.shared.recipientIdFinder let localRecipientUniqueId = try? recipientIdFinder.recipientUniqueId(for: localAci, tx: transaction)?.get() // If we updated a recipient, and if that recipient is ourselves, move the // update over to the Account record type. if let localRecipientUniqueId, state.accountIdChangeMap.removeValue(forKey: localRecipientUniqueId) != nil { state.localAccountChangeState = .updated } } // MARK: - Backup private func backupPendingChanges() async throws { var updatedItems: [StorageService.StorageItem] = [] var deletedIdentifiers: [StorageService.StorageIdentifier] = [] func updateRecord( state: inout State, localId: StateUpdater.IdType, changeState: State.ChangeState, stateUpdater: StateUpdater, needsInterceptForMigration: Bool, transaction: DBReadTransaction, ) { let recordUpdater = stateUpdater.recordUpdater let newRecord: StateUpdater.RecordType? switch changeState { case .unchanged: return case .updated: // We need to preserve the unknown fields (if any) so we don't blow away // data written by newer versions of the app. let recordWithUnknownFields = stateUpdater.recordWithUnknownFields(for: localId, in: state) let unknownFields = recordWithUnknownFields.flatMap { recordUpdater.unknownFields(for: $0) } newRecord = recordUpdater.buildRecord( for: localId, unknownFields: unknownFields, transaction: transaction, ) case .deleted: newRecord = nil } // Note: We might not have a `newRecord` even if the status is `.updated`. // The local value may have been deleted before this operation started. // If there is an existing identifier for this record, mark it for // deletion. We generate a fresh identifier every time a record changes, so // we always start by deleting the old record. if let oldStorageIdentifier = stateUpdater.storageIdentifier(for: localId, in: state) { deletedIdentifiers.append(oldStorageIdentifier) } // Clear out all of the state for the old record. We'll re-add the state if // we have a new record to save. stateUpdater.setStorageIdentifier(nil, for: localId, in: &state) stateUpdater.setRecordWithUnknownFields(nil, for: localId, in: &state) // We've deleted the old record. If we don't have a `newRecord`, stop. guard var newRecord else { return } if needsInterceptForMigration { newRecord = StorageServiceUnknownFieldMigrator.interceptLocalManifestBeforeUploading( record: newRecord, tx: transaction, ) } if recordUpdater.unknownFields(for: newRecord) != nil { stateUpdater.setRecordWithUnknownFields(newRecord, for: localId, in: &state) } let storageItem = recordUpdater.buildStorageItem(for: newRecord) stateUpdater.setStorageIdentifier(storageItem.identifier, for: localId, in: &state) updatedItems.append(storageItem) } func updateRecords( state: inout State, stateUpdater: StateUpdater, needsInterceptForMigration: Bool, transaction: DBReadTransaction, ) { stateUpdater.resetAndEnumerateChangeStates(in: &state) { mutableState, localId, changeState in updateRecord( state: &mutableState, localId: localId, changeState: changeState, stateUpdater: stateUpdater, needsInterceptForMigration: needsInterceptForMigration, transaction: transaction, ) } } var state: State = SSKEnvironment.shared.databaseStorageRef.read { transaction in var state = State.current(transaction: transaction) normalizePendingMutations(in: &state, transaction: transaction) let needsInterceptForMigration = StorageServiceUnknownFieldMigrator.shouldInterceptLocalManifestBeforeUploading(tx: transaction) updateRecords( state: &state, stateUpdater: buildAccountUpdater(), needsInterceptForMigration: needsInterceptForMigration, transaction: transaction, ) updateRecords( state: &state, stateUpdater: buildContactUpdater(), needsInterceptForMigration: needsInterceptForMigration, transaction: transaction, ) updateRecords( state: &state, stateUpdater: buildGroupV1Updater(), needsInterceptForMigration: needsInterceptForMigration, transaction: transaction, ) updateRecords( state: &state, stateUpdater: buildGroupV2Updater(), needsInterceptForMigration: needsInterceptForMigration, transaction: transaction, ) updateRecords( state: &state, stateUpdater: buildStoryDistributionListUpdater(), needsInterceptForMigration: needsInterceptForMigration, transaction: transaction, ) updateRecords( state: &state, stateUpdater: buildCallLinkUpdater(), needsInterceptForMigration: needsInterceptForMigration, transaction: transaction, ) return state } // If we have no pending changes, we have nothing left to do guard !deletedIdentifiers.isEmpty || !updatedItems.isEmpty else { return } // If we have invalid identifiers, we intentionally exclude them from the // prior check. We've already ignored them, so we can clean them up as part // of the next unrelated change. let invalidIdentifiers = state.invalidIdentifiers state.invalidIdentifiers = [] // Bump the manifest version state.manifestVersion += 1 let manifest = buildManifestRecord( manifestVersion: state.manifestVersion, manifestRecordIkm: state.manifestRecordIkm, identifiers: state.allIdentifiers, ) Logger.info( """ Backing up pending changes with proposed manifest version \(state.manifestVersion) (\ New: \(updatedItems.count), \ Deleted: \(deletedIdentifiers.count), \ Invalid/Missing: \(invalidIdentifiers.count), \ Total: \(state.allIdentifiers.count)) """, ) do { try await StorageService.updateManifest( manifest, newItems: updatedItems, deletedIdentifiers: deletedIdentifiers + invalidIdentifiers, deleteAllExistingRecords: false, masterKey: masterKey, chatServiceAuth: authedAccount.chatServiceAuth, ) } catch StorageService.StorageError.conflictingManifest(let conflictingManifest) { guard conflictingManifest.version >= state.manifestVersion else { throw OWSAssertionError("Received conflicting manifest with unexpected version! Conflicting: \(conflictingManifest.version); proposed: \(state.manifestVersion)") } // Throw away all our work, resolve conflicts, and try again. try await self.mergeLocalManifest( withRemoteManifest: conflictingManifest, mergeReason: .conflictBackingUp, ) return } catch StorageService.StorageError.manifestDecryptionFailed(let conflictingVersion) where isPrimaryDevice, StorageService.StorageError.manifestProtoDeserializationFailed(let conflictingVersion) where isPrimaryDevice { /// The remote manifest is invalid and conflicting, which is /// blocking us from doing a backup. Overwrite it. try await createNewManifestAndRecords(version: conflictingVersion + 1) return } Logger.info("Successfully updated to manifest version: \(state.manifestVersion)") // Successfully updated, store our changes. await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in state.save(clearConsecutiveConflicts: true, transaction: transaction) StorageServiceUnknownFieldMigrator.didWriteToStorageService(tx: transaction) } // Notify our other devices that the storage manifest has changed. await SSKEnvironment.shared.syncManagerRef.sendFetchLatestStorageManifestSyncMessage() } private func buildManifestRecord( manifestVersion: UInt64, manifestRecordIkm: Data?, identifiers identifiersParam: [StorageService.StorageIdentifier], ) -> StorageServiceProtoManifestRecord { let identifiers = StorageService.StorageIdentifier.deduplicate(identifiersParam) var manifestBuilder = StorageServiceProtoManifestRecord.builder(version: manifestVersion) if let manifestRecordIkm { owsAssertDebug( manifestRecordIkm.count == StorageService.ManifestRecordIkm.expectedLength, "Found manifest recordIkm with unexpected length! Who generated it?", ) manifestBuilder.setRecordIkm(manifestRecordIkm) } manifestBuilder.setKeys(identifiers.map { $0.buildRecord() }) let tsAccountManager = DependenciesBridge.shared.tsAccountManager if let deviceId = tsAccountManager.storedDeviceIdWithMaybeTransaction.ifValid { manifestBuilder.setSourceDevice(deviceId.uint32Value) } else { owsFailDebug("Can't sync with an invalid deviceId.") } return manifestBuilder.buildInfallibly() } // MARK: - Restore private func restoreOrCreateManifestIfNecessary() async throws { let state: State = SSKEnvironment.shared.databaseStorageRef.read { State.current(transaction: $0) } let greaterThanVersion: UInt64? = { // If we've been flagged to refetch the latest manifest, // don't specify our current manifest version otherwise // the server may return nothing because we've said we // already parsed it. if state.refetchLatestManifest { return nil } return state.manifestVersion }() do { switch try await StorageService.fetchLatestManifest( ifGreaterThanVersion: greaterThanVersion, masterKey: masterKey, chatServiceAuth: authedAccount.chatServiceAuth, ) { case .noExistingManifest: // There is no existing manifest, let's create one. return try await self.createNewManifestAndRecords(version: 1) case .noNewerManifest: // Our manifest version matches the server version, nothing to do here. return case .latestManifest(let manifest): // Our manifest is not the latest, merge in the latest copy. return try await self.mergeLocalManifest( withRemoteManifest: manifest, mergeReason: .fetchedLatest, ) } } catch StorageService.StorageError.manifestDecryptionFailed(let manifestVersion) where isPrimaryDevice, StorageService.StorageError.manifestProtoDeserializationFailed(let manifestVersion) where isPrimaryDevice { // If we succeeded to fetch the manifest but were unable to decrypt or // decode it, it likely means our keys changed or another device encrypted // a malformed value. If this is the primary device, throw everything away // and re-encrypt the social graph with the keys we have locally. Logger.warn("Manifest decryption/deserialization failed on primary, recreating manifest.") try await self.createNewManifestAndRecords(version: manifestVersion + 1) return } catch { switch error { case StorageService.StorageError.manifestDecryptionFailed(_) where !isPrimaryDevice: // If this is a linked device, give up and request the latest storage // service key from the primary device. Logger.warn("Manifest decryption failed on linked device; waiting for keys sync") await sendKeysSyncRequestMessageIfNeeded() default: break } throw error } } // MARK: - Creating new manifests private func createNewManifestPreservingRecords(version: UInt64) async throws { owsPrecondition(isPrimaryDevice) var state = SSKEnvironment.shared.databaseStorageRef.read { tx in State.current(transaction: tx) } state.manifestVersion = version guard let manifestRecordIkm = state.manifestRecordIkm else { /// It only makes sense to preserve records if they're encrypted /// differently from the manifest; which is to say, they use a /// `recordIkm`. If we have no `recordIkm`, we should only create /// a new manifest alongside all-new records. Logger.warn("Missing manifest recordIkm while trying to create new manifest preserving records. Pivoting to creating new manifest and records.") try await createNewManifestAndRecords(version: version) return } let manifest = buildManifestRecord( manifestVersion: version, manifestRecordIkm: manifestRecordIkm, identifiers: state.allIdentifiers, ) if let conflictingManifestVersion = try await createNewManifestAndSaveState( manifest, state: &state, newItems: [], deletedIdentifiers: [], deleteAllExistingRecords: false, ) { /// We hit a conflict, and consequently we can't be confident that /// the records we wanted to preserve can still be preserved. This /// indicates devices racing with unfortunate timing, and so should /// be a niche case. Since we know we need to create a new manifest, /// we can recover by recreating the manifest and records. Logger.warn("Got conflicting manifest version while trying to create new manifest preserving records. Pivoting to creating new manifest and records.") try await createNewManifestAndRecords(version: conflictingManifestVersion + 1) } } private func createNewManifestAndRecords(version: UInt64) async throws { owsPrecondition(isPrimaryDevice) var allItems: [StorageService.StorageItem] = [] var state = State() state.manifestVersion = version SSKEnvironment.shared.databaseStorageRef.read { transaction in /// Generate a new `recordIkm` each time we create a new manifest. /// The records recreated alongside this manifest will be encrypted /// using this newly- generated value. state.manifestRecordIkm = StorageService.ManifestRecordIkm.generateForNewManifest() let shouldInterceptForMigration = StorageServiceUnknownFieldMigrator.shouldInterceptLocalManifestBeforeUploading(tx: transaction) func createRecord( localId: StateUpdater.IdType, stateUpdater: StateUpdater, ) { let recordUpdater = stateUpdater.recordUpdater let newRecord = recordUpdater.buildRecord( for: localId, unknownFields: nil, transaction: transaction, ) guard var newRecord else { return } if shouldInterceptForMigration { newRecord = StorageServiceUnknownFieldMigrator.interceptLocalManifestBeforeUploading( record: newRecord, tx: transaction, ) } let storageItem = recordUpdater.buildStorageItem(for: newRecord) stateUpdater.setStorageIdentifier(storageItem.identifier, for: localId, in: &state) allItems.append(storageItem) } let accountUpdater = buildAccountUpdater() let contactUpdater = buildContactUpdater() let recipientDatabaseTable = DependenciesBridge.shared.recipientDatabaseTable recipientDatabaseTable.enumerateAll(tx: transaction) { recipient in // There's only one recipient that can match our ACI (the column has a // UNIQUE constraint). If, for some reason, our PNI or phone number shows // up elsewhere, we'll try to create a contact record for that identifier, // and we'll fail because it's our own identifier. If we fed *every* match // for a local identifier into the account updater, we might create // multiple account records. if self.localIdentifiers.aci == recipient.aci { createRecord(localId: (), stateUpdater: accountUpdater) } else { createRecord(localId: recipient.uniqueId, stateUpdater: contactUpdater) } } let groupV2Updater = buildGroupV2Updater() let storyDistributionListUpdater = buildStoryDistributionListUpdater() TSThread.anyEnumerate(transaction: transaction) { thread, _ in if let groupThread = thread as? TSGroupThread, let groupModel = groupThread.groupModel as? TSGroupModelV2 { let masterKey: GroupMasterKey do { masterKey = try groupModel.masterKey() } catch { owsFailDebug("Invalid group model \(error).") return } createRecord(localId: masterKey.serialize(), stateUpdater: groupV2Updater) } else if let storyThread = thread as? TSPrivateStoryThread { guard let distributionListId = storyThread.distributionListIdentifier else { owsFailDebug("Missing distribution list id for story thread \(thread.logString)") return } createRecord(localId: distributionListId, stateUpdater: storyDistributionListUpdater) } } // Deleted Private Stories DependenciesBridge.shared.privateStoryThreadDeletionManager .allDeletedIdentifiers(tx: transaction) .forEach { deletedDistributionListIdentifier in createRecord( localId: deletedDistributionListIdentifier, stateUpdater: storyDistributionListUpdater, ) } let callLinkUpdater = buildCallLinkUpdater() let callLinkStore = callLinkUpdater.recordUpdater.callLinkStore callLinkStore.fetchAll(tx: transaction).forEach { createRecord(localId: $0.rootKey.bytes, stateUpdater: callLinkUpdater) } } let identifiers = allItems.map { $0.identifier } let manifest = buildManifestRecord( manifestVersion: state.manifestVersion, manifestRecordIkm: state.manifestRecordIkm, identifiers: identifiers, ) // We want to do this only when absolutely necessary as it's an expensive // query on the server. When we set this flag, the server will query and // purge any orphaned records. let shouldDeletePreviousRecords = version > 1 if let conflictingManifestVersion = try await createNewManifestAndSaveState( manifest, state: &state, newItems: allItems, deletedIdentifiers: [], deleteAllExistingRecords: shouldDeletePreviousRecords, ) { /// We know affirmatively that we want to create a new manifest from /// the data on this device, so if we hit a conflict we'll bump the /// version number and try again (thereby overwriting whatever we /// conflicted with). let newManifestVersion = conflictingManifestVersion + 1 state.manifestVersion = newManifestVersion let manifest = { var builder = manifest.asBuilder() builder.setVersion(newManifestVersion) return builder.buildInfallibly() }() if try await createNewManifestAndSaveState( manifest, state: &state, newItems: allItems, deletedIdentifiers: [], deleteAllExistingRecords: true, ) != nil { throw OWSGenericError("Repeated conflicts trying to create a new manifest; giving up. What's going on?") } } } /// Creates a new manifest from the given parameters, and if successful /// persists the given state. /// /// - Returns /// `nil` if successful, or the version of the current remote manifest if /// updating the manifest results in a version conflict. private func createNewManifestAndSaveState( _ manifest: StorageServiceProtoManifestRecord, state: inout State, newItems: [StorageService.StorageItem], deletedIdentifiers: [StorageService.StorageIdentifier], deleteAllExistingRecords: Bool, ) async throws -> UInt64? { owsPrecondition(isPrimaryDevice) Logger.info("Creating a new manifest with manifest version: \(manifest.version).") let conflictingManifestVersion: UInt64 do { try await StorageService.updateManifest( manifest, newItems: newItems, deletedIdentifiers: deletedIdentifiers, deleteAllExistingRecords: deleteAllExistingRecords, masterKey: masterKey, chatServiceAuth: authedAccount.chatServiceAuth, ) /// We created a new manifest, so let's tell our other devices to go /// fetch it. await SSKEnvironment.shared.syncManagerRef.sendFetchLatestStorageManifestSyncMessage() /// Store our changes. await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in state.save(clearConsecutiveConflicts: true, transaction: transaction) StorageServiceUnknownFieldMigrator.didWriteToStorageService(tx: transaction) } return nil } catch StorageService.StorageError.conflictingManifest(let conflictingManifest) { /// This is weird, because we generally only create a new manifest /// when we know the existing manifest is broken. Somehow, between /// the time we found it broken and decided we needed to recreate /// and now, it became un-broken. /// /// This should never happen, so rather than trying to merge in the /// conflicting manifest and handling errors (such as those from /// fetching and decrypting storage items that may yet be broken) /// callers will see the conflicting version and overwrite whatever /// was in the mysteriously-fixed manifest. conflictingManifestVersion = conflictingManifest.version } catch StorageService.StorageError.manifestDecryptionFailed(let _conflictingManifestVersion), StorageService.StorageError.manifestProtoDeserializationFailed(let _conflictingManifestVersion) { /// This indicates that we found a conflicting remote manifest that /// we couldn't read. For example, maybe we're creating a new /// manifest in response to having rotated keys on this (primary) /// device, and one of our other devices updated the manifest using /// old keys. /// /// Regardless, we can't recover what's in this manifest, so instead /// we'll let callers see the conflicting version and overwrite /// whatever was in it. conflictingManifestVersion = _conflictingManifestVersion } return conflictingManifestVersion } // MARK: - Conflict Resolution private enum MergeLocalManifestReason { case fetchedLatest case conflictBackingUp } private func mergeLocalManifest( withRemoteManifest manifest: StorageServiceProtoManifestRecord, mergeReason: MergeLocalManifestReason, ) async throws { var state: State = await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in var state = State.current(transaction: transaction) normalizePendingMutations(in: &state, transaction: transaction) switch mergeReason { case .fetchedLatest: break case .conflictBackingUp: state.consecutiveConflicts += 1 state.save(transaction: transaction) } return state } // If we've tried many times in a row to resolve conflicts, something weird // is happening (potentially a bug on the service or a race with another // app). Give up and wait until the next backup runs. guard state.consecutiveConflicts <= StorageServiceOperation.maxConsecutiveConflicts else { owsFailDebug("unexpectedly have had numerous repeated conflicts") // Clear out the consecutive conflicts count so we can try again later. await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in state.save(clearConsecutiveConflicts: true, transaction: transaction) } throw OWSAssertionError("exceeded max consecutive conflicts, creating a new manifest") } let allManifestItems: Set = Set(manifest.keys.lazy.map { .init(data: $0.data, type: $0.type) }) // Calculate new or updated items by looking up the ids of any items we // don't know about locally. Since a new id is always generated after a // change, this reflects changes made since the last manifest version. var newOrUpdatedItems = Array(allManifestItems.subtracting(state.allIdentifiers)) // We also want to refetch any identifiers that we didn't know how to parse // before but now do know how to parse. These might not have gotten // updated, so we need to add them explicitly. for (keyType, unknownIdentifiers) in state.unknownIdentifiersTypeMap { guard Self.isKnownKeyType(keyType) else { continue } newOrUpdatedItems.append(contentsOf: unknownIdentifiers) } let localKeysCount = state.allIdentifiers.count Logger.info("\(manifest.logDescription); merging \(newOrUpdatedItems.count); \(localKeysCount) local; \(allManifestItems.count) remote") do { // First, fetch the local account record if it has been updated. We give this record // priority over all other records as it contains things like the user's configuration // that we want to update ASAP, especially when restoring after linking. try await { if let storageIdentifier = state.localAccountIdentifier, allManifestItems.contains(storageIdentifier) { return } let localAccountIdentifiers = newOrUpdatedItems.filter { $0.type == .account } assert(localAccountIdentifiers.count <= 1) guard let newLocalAccountIdentifier = localAccountIdentifiers.first else { owsFailDebug("remote manifest is missing local account, mark it for update") state.localAccountChangeState = .updated return } Logger.info("\(manifest.logDescription); merging account record") let item: StorageService.StorageItem? item = try await StorageService.fetchItems( for: [newLocalAccountIdentifier], manifest: manifest, masterKey: masterKey, chatServiceAuth: authedAccount.chatServiceAuth, ).first guard let item else { // This can happen in normal use if between fetching the manifest and starting the item // fetch a linked device has updated the manifest. state.localAccountChangeState = .updated return } guard let accountRecord = item.accountRecord else { throw OWSAssertionError("unexpected item type for account identifier") } await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in self.mergeRecord( accountRecord, identifier: item.identifier, state: &state, stateUpdater: self.buildAccountUpdater(), transaction: transaction, ) state.save(transaction: transaction) } // Remove any account record identifiers from the new or updated basket. We've processed them. newOrUpdatedItems.removeAll { localAccountIdentifiers.contains($0) } }() // Clean up our unknown identifiers type map to only reflect identifiers // that still exist in the manifest. If we find more unknown identifiers in // any batch, we'll add them in `fetchAndMergeItemsInBatches`. state.unknownIdentifiersTypeMap = state.unknownIdentifiersTypeMap .mapValues { unknownIdentifiers in Array(allManifestItems.intersection(unknownIdentifiers)) } .filter { recordType, unknownIdentifiers in !unknownIdentifiers.isEmpty } // Then, fetch the remaining items in the manifest and resolve any conflicts as appropriate. try await self.fetchAndMergeItemsInBatches(identifiers: newOrUpdatedItems, manifest: manifest, state: &state) let storageServiceManager = SSKEnvironment.shared.storageServiceManagerRef await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in // Update the manifest version to reflect the remote version we just restored to state.manifestVersion = manifest.version /// Update the manifest `recordIkm` to reflect the remote one we /// just merged in. We need to save this, since it should only /// change if we are fully recreating the manifest and /// reuploading all records. state.manifestRecordIkm = manifest.recordIkm if isPrimaryDevice, let localManifestRecordIkm = state.manifestRecordIkm, let remoteManifestRecordIkm = manifest.recordIkm { owsAssertDebug( localManifestRecordIkm == remoteManifestRecordIkm, "Primary unexpectedly found a remote manifest recordIkm that doesn't match the local one. Who rotated it?", ) } // We just did a successful manifest fetch and restore, so we no longer need to refetch it state.refetchLatestManifest = false // We fetched all the previously unknown identifiers, so we don't need to // fetch them again in the future unless they're updated. state.unknownIdentifiersTypeMap = state.unknownIdentifiersTypeMap .filter { keyType, _ in !Self.isKnownKeyType(keyType) } // Save invalid identifiers to remove during the write operation. // // We don't remove them immediately because we've already ignored them, and // we want to avoid fighting against another device that may put them back // when we remove them. Instead, we simply keep track of them so that we // can delete them during our next mutation. // // We may have invalid identifiers for three reasons: // // (1) We got back an .invalid merge result, meaning we didn't process a // storage item. As a result, our local state won't reference it. // // (2) There are two storage items (with different storage identifiers) // whose contents refer to the same thing (eg, group, story). In this case, // the latter will replace the former, and the former will be orphaned. // // (3) The identifier is present in the manifest, but the corresponding // item can't be fetched. When this happens, the most likely explanation is // that our manifest is out of date. The next time we try to write, we'll // get a conflict, merge the latest manifest, see that it no longer // references this identifier, and remove it from `invalidIdentifiers`. (In // the less common case where the latest manifest does refer to a // non-existent identifier, this device will take care of fixing up the // manifest to remove the reference.) state.invalidIdentifiers = allManifestItems.subtracting(state.allIdentifiers) let invalidIdentifierCount = state.invalidIdentifiers.count // Mark any orphaned records as pending update so we re-add them to the manifest. var orphanedGroupV2Count = 0 for (groupMasterKey, identifier) in state.groupV2MasterKeyToIdentifierMap where !allManifestItems.contains(identifier) { state.groupV2ChangeMap[groupMasterKey] = .updated orphanedGroupV2Count += 1 } var orphanedStoryDistributionListCount = 0 for (dlistIdentifier, storageIdentifier) in state.storyDistributionListIdentifierToStorageIdentifierMap where !allManifestItems.contains(storageIdentifier) { state.storyDistributionListChangeMap[dlistIdentifier] = .updated orphanedStoryDistributionListCount += 1 } var orphanedCallLinkRootKeyCount = 0 for (callLinkRootKeyData, storageIdentifier) in state.callLinkRootKeyToStorageIdentifierMap where !allManifestItems.contains(storageIdentifier) { // If another client removes a deleted call link, allow it. let callLinkStore = DependenciesBridge.shared.callLinkStore guard let callLinkRootKey = try? CallLinkRootKey(callLinkRootKeyData), let callLinkRecord = callLinkStore.fetch(roomId: callLinkRootKey.deriveRoomId(), tx: transaction), callLinkRecord.adminPasskey != nil else { continue } state.callLinkRootKeyChangeMap[callLinkRootKeyData] = .updated orphanedCallLinkRootKeyCount += 1 } var orphanedAccountCount = 0 let currentDate = Date() let recipientDatabaseTable = DependenciesBridge.shared.recipientDatabaseTable for (recipientUniqueId, identifier) in state.accountIdToIdentifierMap where !allManifestItems.contains(identifier) { // Only consider registered recipients as orphaned. If another client // removes an unregistered recipient, allow it. guard let recipient = recipientDatabaseTable.fetchRecipient(uniqueId: recipientUniqueId, tx: transaction), let storageServiceContact = StorageServiceContact(recipient), storageServiceContact.shouldBeInStorageService(currentDate: currentDate, remoteConfig: .current), storageServiceContact.registrationStatus(currentDate: currentDate, remoteConfig: .current) == .registered else { continue } state.accountIdChangeMap[recipientUniqueId] = .updated orphanedAccountCount += 1 } let pendingChangesCount = ( state.accountIdChangeMap.count + state.groupV2ChangeMap.count + state.storyDistributionListChangeMap.count + state.callLinkRootKeyChangeMap.count, ) Logger.info( """ \(manifest.logDescription) finished; \ \(pendingChangesCount) pending updates; \ \(invalidIdentifierCount) missing/invalid ids; \ \(orphanedAccountCount) orphaned accounts; \ \(orphanedGroupV2Count) orphaned gv2; \ \(orphanedStoryDistributionListCount) orphaned dlists; \ \(orphanedCallLinkRootKeyCount) orphaned clinks """, ) state.save(clearConsecutiveConflicts: true, transaction: transaction) switch mergeReason { case .fetchedLatest: break case .conflictBackingUp: // If we're merging because we had a conflict while backing // up, reattempt that backup now that we've merged. storageServiceManager.backupPendingChanges(authedDevice: self.authedDevice) } } } catch let storageError as StorageService.StorageError { // If we succeeded to fetch the records but were unable to decrypt any of them, // it likely means our keys changed. if case .itemDecryptionFailed = storageError { // If this is the primary device, throw everything away and re-encrypt // the social graph with the keys we have locally. if self.isPrimaryDevice { Logger.warn("Item decryption failed, recreating manifest.") try await self.createNewManifestAndRecords(version: manifest.version + 1) return } Logger.warn("Item decryption failed; waiting for keys sync") // If this is a linked device, give up and request the latest storage // service key from the primary device. await sendKeysSyncRequestMessageIfNeeded() } else if case .itemProtoDeserializationFailed = storageError, self.isPrimaryDevice { // If decryption succeeded but proto deserialization failed, we somehow ended up with // byte garbage in storage service. Our only recourse is to throw everything away and // re-encrypt the social graph with data we have locally. Logger.warn("Item deserialization failed, recreating manifest.") try await self.createNewManifestAndRecords(version: manifest.version + 1) return } throw storageError } } private static var itemsBatchSize: Int { CurrentAppContext().isNSE ? 256 : 1024 } private func fetchAndMergeItemsInBatches( identifiers: [StorageService.StorageIdentifier], manifest: StorageServiceProtoManifestRecord, state: inout State, ) async throws { var deferredItems = [StorageService.StorageItem]() for identifierBatch in identifiers.chunked(by: Self.itemsBatchSize) { let fetchedItems: [StorageService.StorageItem] fetchedItems = try await StorageService.fetchItems( for: Array(identifierBatch), manifest: manifest, masterKey: masterKey, chatServiceAuth: self.authedAccount.chatServiceAuth, ) // We process contacts with ACIs before those without ACIs. We do this to // ensure we process split operations first. If we don't, then we'll likely // try to re-populate the ACI based on our local state. var batchItems = [StorageService.StorageItem]() var batchDeferredItemCount = 0 for fetchedItem in fetchedItems { if let record = fetchedItem.contactRecord, StorageServiceContactRecordUpdater.shouldDeferMerge(record) { deferredItems.append(fetchedItem) batchDeferredItemCount += 1 } else { batchItems.append(fetchedItem) } } await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in self.mergeItems(batchItems, state: &state, tx: tx) } Logger.info("\(manifest.logDescription); fetched \(identifierBatch.count) items; processed \(batchItems.count); deferred \(batchDeferredItemCount)") } for deferredBatch in deferredItems.chunked(by: Self.itemsBatchSize) { await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in self.mergeItems(deferredBatch, state: &state, tx: tx) } Logger.info("\(manifest.logDescription); processed \(deferredBatch.count) deferred items") } } private func mergeItems(_ items: some Sequence, state: inout State, tx: DBWriteTransaction) { let contactUpdater = buildContactUpdater() let groupV1Updater = buildGroupV1Updater() let groupV2Updater = buildGroupV2Updater() let storyDistributionListUpdater = buildStoryDistributionListUpdater() let callLinkUpdater = buildCallLinkUpdater() for item in items { func _mergeRecord( _ record: StateUpdater.RecordType, stateUpdater: StateUpdater, ) { self.mergeRecord( record, identifier: item.identifier, state: &state, stateUpdater: stateUpdater, transaction: tx, ) } if let contactRecord = item.contactRecord { _mergeRecord(contactRecord, stateUpdater: contactUpdater) } else if let groupV1Record = item.groupV1Record { _mergeRecord(groupV1Record, stateUpdater: groupV1Updater) } else if let groupV2Record = item.groupV2Record { _mergeRecord(groupV2Record, stateUpdater: groupV2Updater) } else if let storyDistributionListRecord = item.storyDistributionListRecord { _mergeRecord(storyDistributionListRecord, stateUpdater: storyDistributionListUpdater) } else if let callLinkRecord = item.callLinkRecord { _mergeRecord(callLinkRecord, stateUpdater: callLinkUpdater) } else if case .account = item.identifier.type { owsFailDebug("unexpectedly found account record in remaining items") } else { // This is not a record type we know about yet, so record this identifier in // our unknown mapping. This allows us to skip fetching it in the future and // not accidentally blow it away when we push an update. var unknownIdentifiersOfType = state.unknownIdentifiersTypeMap[item.identifier.type] ?? [] unknownIdentifiersOfType.append(item.identifier) state.unknownIdentifiersTypeMap[item.identifier.type] = unknownIdentifiersOfType } } // Saving here records the new storage identifiers with the *old* manifest // version. This allows us to incrementally work through changes in a // manifest, even if we fail part way through the update we'll continue // trying to apply the changes we haven't received yet (since we still know // we're on an older version overall). state.save(clearConsecutiveConflicts: true, transaction: tx) } // MARK: - Clean Up private func cleanUpUnknownData() async { Logger.info("") var (state, migrationVersion) = SSKEnvironment.shared.databaseStorageRef.read { tx in var state = State.current(transaction: tx) normalizePendingMutations(in: &state, transaction: tx) return (state, Self.migrationStore.getInt(Self.versionKey, defaultValue: 0, transaction: tx)) } await self.cleanUpUnknownIdentifiers(in: &state) await self.cleanUpRecordsWithUnknownFields(in: &state) await self.cleanUpOrphanedAccounts(in: &state) switch migrationVersion { case 0: await self.recordPendingMutationsForContactsWithPNIs(in: &state) await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in Self.migrationStore.setInt(1, key: Self.versionKey, transaction: tx) } fallthrough default: break } } private static func isKnownKeyType(_ keyType: StorageServiceProtoManifestRecordKeyType?) -> Bool { switch keyType { case .contact: return true case .groupv1: return true case .groupv2: return true case .account: return true case .storyDistributionList: return true case .callLink: return true case .unknown, .UNRECOGNIZED, nil: return false } } private func cleanUpUnknownIdentifiers(in state: inout State) async { let canParseAnyUnknownIdentifier = state.unknownIdentifiersTypeMap.contains { keyType, unknownIdentifiers in guard Self.isKnownKeyType(keyType) else { // We don't know this type, so it's not parseable. return false } guard !unknownIdentifiers.isEmpty else { // There's no identifiers of this type, so there's nothing to parse. return false } return true } guard canParseAnyUnknownIdentifier else { return } // We may have learned of new record types. If so, we should refetch the // latest manifest so that we can merge these items. await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in state.refetchLatestManifest = true state.save(transaction: tx) } } private func cleanUpRecordsWithUnknownFields(in state: inout State) async { var shouldCleanUpRecordsWithUnknownFields = state.unknownFieldLastCheckedAppVersion != AppVersionImpl.shared.currentAppVersion #if DEBUG // Debug builds don't have proper version numbers but we do want to run // these migrations on them. if !shouldCleanUpRecordsWithUnknownFields { if SSKEnvironment.shared.databaseStorageRef.read(block: { StorageServiceUnknownFieldMigrator.needsAnyUnknownFieldsMigrations(tx: $0) }) { shouldCleanUpRecordsWithUnknownFields = true } } #endif guard shouldCleanUpRecordsWithUnknownFields else { return } state.unknownFieldLastCheckedAppVersion = AppVersionImpl.shared.currentAppVersion func fetchRecordsWithUnknownFields( stateUpdater: some StorageServiceStateUpdater, tx: DBWriteTransaction, ) -> [any MigrateableStorageServiceRecordType] { return stateUpdater.recordsWithUnknownFields(in: state) .lazy .map(\.1) .compactMap { $0 as? (any MigrateableStorageServiceRecordType) } } // For any cached records with unknown fields, optimistically try to merge // with our local data to see if we now understand those fields. Note: It's // possible and expected that we might understand some of the fields that // were previously unknown but not all of them. Even if we can't fully // merge any values, we might partially merge all the values. func mergeRecordsWithUnknownFields( stateUpdater: some StorageServiceStateUpdater, tx: DBWriteTransaction, ) { let recordsWithUnknownFields = stateUpdater.recordsWithUnknownFields(in: state) if recordsWithUnknownFields.isEmpty { return } let debugDescription = "\(type(of: stateUpdater.recordUpdater))" for (localId, recordWithUnknownFields) in recordsWithUnknownFields { guard let storageIdentifier = stateUpdater.storageIdentifier(for: localId, in: state) else { owsFailDebug("Unknown fields: Missing identifier for \(debugDescription)") stateUpdater.setRecordWithUnknownFields(nil, for: localId, in: &state) continue } mergeRecord( recordWithUnknownFields, identifier: storageIdentifier, state: &state, stateUpdater: stateUpdater, transaction: tx, ) } let remainingCount = stateUpdater.recordsWithUnknownFields(in: state).count let resolvedCount = recordsWithUnknownFields.count - remainingCount Logger.info("Unknown fields: Resolved \(resolvedCount) records (\(remainingCount) remaining) for \(debugDescription)") } await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in let stateUpdaters: [any StorageServiceStateUpdater] = [ buildAccountUpdater(), buildContactUpdater(), buildGroupV2Updater(), buildStoryDistributionListUpdater(), buildCallLinkUpdater(), ] if StorageServiceUnknownFieldMigrator.needsAnyUnknownFieldsMigrations(tx: tx) { // First accumulate records to run one-time migrations on. var records: [any MigrateableStorageServiceRecordType] = [] for stateUpdater in stateUpdaters { records.append( contentsOf: fetchRecordsWithUnknownFields( stateUpdater: stateUpdater, tx: tx, ), ) } // Note: we run even if there are no records with "unknown fields". // This is because fields with default values (e.g. a bool with false set) // don't show up in the serialized proto at all. Therefore, if there is an // unknown field sent to us with a default value, we won't even know its // there and it won't show up in "records with unknown fields". // But we should still run migrations, which should assume the default // value was set for any records not passed in. StorageServiceUnknownFieldMigrator.runMigrationsForRecordsWithUnknownFields( records: records, tx: tx, ) } stateUpdaters.forEach { mergeRecordsWithUnknownFields(stateUpdater: $0, tx: tx) } Logger.info("Resolved unknown fields using manifest version \(state.manifestVersion)") state.save(transaction: tx) } } private func cleanUpOrphanedAccounts(in state: inout State) async { // We don't keep unregistered accounts in storage service after a certain // amount of time. We may also have records for accounts that no longer // exist, e.g. that SignalRecipient was merged with another recipient. We // try to proactively delete these records from storage service, but there // was a period of time we didn't, and we need to cleanup after ourselves. let currentDate = Date() let currentConfig: RemoteConfig = .current await recordPendingAccountMutations(in: &state, shouldUpdate: { return $0?.shouldBeInStorageService(currentDate: currentDate, remoteConfig: currentConfig) != true }) } private func recordPendingMutationsForContactsWithPNIs(in state: inout State) async { // We stored invalid PNIs, so run a one-off migration to fix them. await recordPendingAccountMutations(in: &state, shouldUpdate: { $0?.pni != nil }) } private func recordPendingAccountMutations( in state: inout State, caller: String = #function, shouldUpdate: (StorageServiceContact?) -> Bool, ) async { let databaseStorage = SSKEnvironment.shared.databaseStorageRef let recipientDatabaseTable = DependenciesBridge.shared.recipientDatabaseTable let recipientUniqueIds = databaseStorage.read { tx in state.accountIdToIdentifierMap.keys.filter { return shouldUpdate(recipientDatabaseTable.fetchRecipient(uniqueId: $0, tx: tx).flatMap(StorageServiceContact.init(_:))) } } if recipientUniqueIds.isEmpty { return } Logger.info("Marking \(recipientUniqueIds.count) contact records as mutated via \(caller)") await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in var pendingMutations = PendingMutations() pendingMutations.updatedRecipientUniqueIds.formUnion(recipientUniqueIds) Self.recordPendingMutations(pendingMutations, in: &state, transaction: tx) state.save(transaction: tx) } } // MARK: - Record Merge private func mergeRecord( _ record: StateUpdater.RecordType, identifier: StorageService.StorageIdentifier, state: inout State, stateUpdater: StateUpdater, transaction: DBWriteTransaction, ) { var record = record // First apply any migrations if StorageServiceUnknownFieldMigrator.shouldInterceptRemoteManifestBeforeMerging(tx: transaction) { record = StorageServiceUnknownFieldMigrator.interceptRemoteManifestBeforeMerging( record: record, tx: transaction, ) } let mergeResult = stateUpdater.recordUpdater.mergeRecord( record, transaction: transaction, ) switch mergeResult { case .invalid: // This record doesn't have a valid identifier. We can't fix it, so we have // no choice but to delete it. break case .merged(needsUpdate: let needsUpdate, let localId): // Mark that our local state matches the state from storage service. stateUpdater.setStorageIdentifier(identifier, for: localId, in: &state) // If we have local changes that need to be synced, mark the state as // `.updated`. Otherwise, our local state and storage service state match, // so we can clear out any pending sync request. stateUpdater.setChangeState(needsUpdate ? .updated : nil, for: localId, in: &state) // If the record has unknown fields, we need to hold on to it. This allows // future versions of the app to interpret those fields. let hasUnknownFields = stateUpdater.recordUpdater.unknownFields(for: record) != nil stateUpdater.setRecordWithUnknownFields(hasUnknownFields ? record : nil, for: localId, in: &state) } } // MARK: - Record Updaters private func buildAccountUpdater() -> SingleElementStateUpdater { return SingleElementStateUpdater( recordUpdater: StorageServiceAccountRecordUpdater( localIdentifiers: localIdentifiers, isPrimaryDevice: isPrimaryDevice, authedAccount: authedAccount, avatarDefaultColorManager: DependenciesBridge.shared.avatarDefaultColorManager, backupPlanManager: DependenciesBridge.shared.backupPlanManager, backupSubscriptionManager: DependenciesBridge.shared.backupSubscriptionManager, donationSubscriptionManager: DependenciesBridge.shared.donationSubscriptionManager, dmConfigurationStore: DependenciesBridge.shared.disappearingMessagesConfigurationStore, linkPreviewSettingStore: DependenciesBridge.shared.linkPreviewSettingStore, localUsernameManager: DependenciesBridge.shared.localUsernameManager, keyTransparencyManager: DependenciesBridge.shared.keyTransparencyManager, paymentsHelper: SSKEnvironment.shared.paymentsHelperRef, phoneNumberDiscoverabilityManager: DependenciesBridge.shared.phoneNumberDiscoverabilityManager, pinnedThreadManager: DependenciesBridge.shared.pinnedThreadManager, preferences: SSKEnvironment.shared.preferencesRef, profileManager: SSKEnvironment.shared.profileManagerImplRef, receiptManager: SSKEnvironment.shared.receiptManagerRef, recipientDatabaseTable: DependenciesBridge.shared.recipientDatabaseTable, registrationStateChangeManager: DependenciesBridge.shared.registrationStateChangeManager, storageServiceManager: SSKEnvironment.shared.storageServiceManagerRef, systemStoryManager: SSKEnvironment.shared.systemStoryManagerRef, tsAccountManager: DependenciesBridge.shared.tsAccountManager, typingIndicators: SSKEnvironment.shared.typingIndicatorsRef, udManager: SSKEnvironment.shared.udManagerRef, usernameEducationManager: DependenciesBridge.shared.usernameEducationManager, adminDeleteManager: DependenciesBridge.shared.adminDeleteManager, ), changeState: \.localAccountChangeState, storageIdentifier: \.localAccountIdentifier, recordWithUnknownFields: \.localAccountRecordWithUnknownFields, ) } private func buildContactUpdater() -> MultipleElementStateUpdater { return MultipleElementStateUpdater( recordUpdater: StorageServiceContactRecordUpdater( localIdentifiers: localIdentifiers, isPrimaryDevice: isPrimaryDevice, authedAccount: authedAccount, avatarDefaultColorManager: DependenciesBridge.shared.avatarDefaultColorManager, blockingManager: SSKEnvironment.shared.blockingManagerRef, contactsManager: SSKEnvironment.shared.contactManagerImplRef, identityManager: DependenciesBridge.shared.identityManager, nicknameManager: DependenciesBridge.shared.nicknameManager, profileFetcher: SSKEnvironment.shared.profileFetcherRef, profileManager: SSKEnvironment.shared.profileManagerImplRef, recipientDatabaseTable: DependenciesBridge.shared.recipientDatabaseTable, recipientManager: DependenciesBridge.shared.recipientManager, recipientMerger: DependenciesBridge.shared.recipientMerger, recipientHidingManager: DependenciesBridge.shared.recipientHidingManager, remoteConfigProvider: SSKEnvironment.shared.remoteConfigManagerRef, signalServiceAddressCache: SSKEnvironment.shared.signalServiceAddressCacheRef, tsAccountManager: DependenciesBridge.shared.tsAccountManager, usernameLookupManager: DependenciesBridge.shared.usernameLookupManager, ), changeState: \.accountIdChangeMap, storageIdentifier: \.accountIdToIdentifierMap, recordWithUnknownFields: \.accountIdToRecordWithUnknownFields, ) } private func buildGroupV1Updater() -> MultipleElementStateUpdater { return MultipleElementStateUpdater( recordUpdater: StorageServiceGroupV1RecordUpdater(), changeState: \.groupV1ChangeMap, storageIdentifier: \.groupV1IdToIdentifierMap, recordWithUnknownFields: \.groupV1IdToRecordWithUnknownFields, ) } private func buildGroupV2Updater() -> MultipleElementStateUpdater { return MultipleElementStateUpdater( recordUpdater: StorageServiceGroupV2RecordUpdater( authedAccount: authedAccount, isPrimaryDevice: isPrimaryDevice, avatarDefaultColorManager: DependenciesBridge.shared.avatarDefaultColorManager, blockingManager: SSKEnvironment.shared.blockingManagerRef, groupsV2: SSKEnvironment.shared.groupsV2Ref, profileManager: SSKEnvironment.shared.profileManagerRef, ), changeState: \.groupV2ChangeMap, storageIdentifier: \.groupV2MasterKeyToIdentifierMap, recordWithUnknownFields: \.groupV2MasterKeyToRecordWithUnknownFields, ) } private func buildStoryDistributionListUpdater() -> MultipleElementStateUpdater { return MultipleElementStateUpdater( recordUpdater: StorageServiceStoryDistributionListRecordUpdater( privateStoryThreadDeletionManager: DependenciesBridge.shared.privateStoryThreadDeletionManager, recipientDatabaseTable: DependenciesBridge.shared.recipientDatabaseTable, recipientFetcher: DependenciesBridge.shared.recipientFetcher, storyRecipientManager: DependenciesBridge.shared.storyRecipientManager, storyRecipientStore: DependenciesBridge.shared.storyRecipientStore, threadRemover: DependenciesBridge.shared.threadRemover, ), changeState: \.storyDistributionListChangeMap, storageIdentifier: \.storyDistributionListIdentifierToStorageIdentifierMap, recordWithUnknownFields: \.storyDistributionListIdentifierToRecordWithUnknownFields, ) } private func buildCallLinkUpdater() -> MultipleElementStateUpdater { return MultipleElementStateUpdater( recordUpdater: StorageServiceCallLinkRecordUpdater( callLinkStore: DependenciesBridge.shared.callLinkStore, callRecordDeleteManager: DependenciesBridge.shared.callRecordDeleteManager, callRecordStore: DependenciesBridge.shared.callRecordStore, ), changeState: \.callLinkRootKeyChangeMap, storageIdentifier: \.callLinkRootKeyToStorageIdentifierMap, recordWithUnknownFields: \.callLinkRootKeyToRecordWithUnknownFields, ) } // MARK: - State private static var maxConsecutiveConflicts = 3 struct State: Codable { fileprivate var manifestVersion: UInt64 = 0 private var _refetchLatestManifest: Bool? fileprivate var refetchLatestManifest: Bool { get { _refetchLatestManifest ?? false } set { _refetchLatestManifest = newValue } } /// Input Keying Material (IKM) used to encrypt records tracked by the /// current manifest. fileprivate var manifestRecordIkm: Data? fileprivate var consecutiveConflicts: Int = 0 fileprivate var localAccountIdentifier: StorageService.StorageIdentifier? fileprivate var localAccountRecordWithUnknownFields: StorageServiceProtoAccountRecord? @BidirectionalLegacyDecoding fileprivate var accountIdToIdentifierMap: [RecipientUniqueId: StorageService.StorageIdentifier] = [:] private var _accountIdToRecordWithUnknownFields: [RecipientUniqueId: StorageServiceProtoContactRecord]? var accountIdToRecordWithUnknownFields: [RecipientUniqueId: StorageServiceProtoContactRecord] { get { _accountIdToRecordWithUnknownFields ?? [:] } set { _accountIdToRecordWithUnknownFields = newValue } } @BidirectionalLegacyDecoding fileprivate var groupV1IdToIdentifierMap: [Data: StorageService.StorageIdentifier] = [:] private var _groupV1IdToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV1Record]? var groupV1IdToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV1Record] { get { _groupV1IdToRecordWithUnknownFields ?? [:] } set { _groupV1IdToRecordWithUnknownFields = newValue } } @BidirectionalLegacyDecoding fileprivate var groupV2MasterKeyToIdentifierMap: [Data: StorageService.StorageIdentifier] = [:] private var _groupV2MasterKeyToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV2Record]? var groupV2MasterKeyToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV2Record] { get { _groupV2MasterKeyToRecordWithUnknownFields ?? [:] } set { _groupV2MasterKeyToRecordWithUnknownFields = newValue } } private var _storyDistributionListIdentifierToStorageIdentifierMap: [Data: StorageService.StorageIdentifier]? fileprivate var storyDistributionListIdentifierToStorageIdentifierMap: [Data: StorageService.StorageIdentifier] { get { _storyDistributionListIdentifierToStorageIdentifierMap ?? [:] } set { _storyDistributionListIdentifierToStorageIdentifierMap = newValue } } private var _storyDistributionListIdentifierToRecordWithUnknownFields: [Data: StorageServiceProtoStoryDistributionListRecord]? fileprivate var storyDistributionListIdentifierToRecordWithUnknownFields: [Data: StorageServiceProtoStoryDistributionListRecord] { get { _storyDistributionListIdentifierToRecordWithUnknownFields ?? [:] } set { _storyDistributionListIdentifierToRecordWithUnknownFields = newValue } } fileprivate var unknownIdentifiersTypeMap: [StorageServiceProtoManifestRecordKeyType: [StorageService.StorageIdentifier]] = [:] fileprivate var unknownIdentifiers: [StorageService.StorageIdentifier] { unknownIdentifiersTypeMap.values.flatMap { $0 } } /// Invalid identifiers from the most recent merge that should be removed /// during the next mutation. fileprivate var invalidIdentifiers: Set { get { _invalidIdentifiers ?? Set() } set { _invalidIdentifiers = newValue.isEmpty ? nil : newValue } } fileprivate var _invalidIdentifiers: Set? /// The app version from the last time we checked unknown fields. We can /// only transition unknown fields to known fields via an update, so we only /// need to check once per app version. fileprivate var unknownFieldLastCheckedAppVersion: String? enum ChangeState: Int, Codable { case unchanged = 0 case updated = 1 /// This is mostly vestigial, but even when we no longer assign this status /// in new versions of the application, we'll still need to support reading /// it (for times when it was written by prior versions of the application). case deleted = 2 } fileprivate var localAccountChangeState: ChangeState = .unchanged fileprivate var accountIdChangeMap: [RecipientUniqueId: ChangeState] = [:] fileprivate var groupV2ChangeMap: [Data: ChangeState] = [:] /// We will no longer update this value, and want to also ignore this /// value in any previously-persisted state. @EmptyForCodable fileprivate var groupV1ChangeMap: [Data: ChangeState] = [:] private var _storyDistributionListChangeMap: [Data: ChangeState]? fileprivate var storyDistributionListChangeMap: [Data: ChangeState] { get { _storyDistributionListChangeMap ?? [:] } set { _storyDistributionListChangeMap = newValue } } private var _callLinkRootKeyChangeMap: [Data: ChangeState]? fileprivate var callLinkRootKeyChangeMap: [Data: ChangeState] { get { _callLinkRootKeyChangeMap ?? [:] } set { _callLinkRootKeyChangeMap = newValue } } private var _callLinkRootKeyToStorageIdentifierMap: [Data: StorageService.StorageIdentifier]? fileprivate var callLinkRootKeyToStorageIdentifierMap: [Data: StorageService.StorageIdentifier] { get { _callLinkRootKeyToStorageIdentifierMap ?? [:] } set { _callLinkRootKeyToStorageIdentifierMap = newValue } } private var _callLinkRootKeyToRecordWithUnknownFields: [Data: StorageServiceProtoCallLinkRecord]? fileprivate var callLinkRootKeyToRecordWithUnknownFields: [Data: StorageServiceProtoCallLinkRecord] { get { _callLinkRootKeyToRecordWithUnknownFields ?? [:] } set { _callLinkRootKeyToRecordWithUnknownFields = newValue } } fileprivate var allIdentifiers: [StorageService.StorageIdentifier] { var allIdentifiers = [StorageService.StorageIdentifier]() if let localAccountIdentifier { allIdentifiers.append(localAccountIdentifier) } allIdentifiers += accountIdToIdentifierMap.values allIdentifiers += groupV1IdToIdentifierMap.values allIdentifiers += groupV2MasterKeyToIdentifierMap.values allIdentifiers += storyDistributionListIdentifierToStorageIdentifierMap.values allIdentifiers += callLinkRootKeyToStorageIdentifierMap.values // We must persist any unknown identifiers, as they are potentially associated with // valid records that this version of the app doesn't yet understand how to parse. // Otherwise, this will cause ping-ponging with newer apps when they try and backup // new types of records, and then we subsequently delete them. allIdentifiers += unknownIdentifiers return allIdentifiers } private static let stateKey = "state" fileprivate static func current(transaction: DBReadTransaction) -> State { guard let stateData = keyValueStore.getData(stateKey, transaction: transaction) else { return State() } guard let current = try? JSONDecoder().decode(State.self, from: stateData) else { owsFailDebug("failed to decode state data") return State() } return current } fileprivate mutating func save(clearConsecutiveConflicts: Bool = false, transaction: DBWriteTransaction) { if clearConsecutiveConflicts { consecutiveConflicts = 0 } guard let stateData = try? JSONEncoder().encode(self) else { return owsFailDebug("failed to encode state data") } keyValueStore.setData(stateData, key: State.stateKey, transaction: transaction) } } } // MARK: - State Updaters protocol StorageServiceStateUpdater { associatedtype RecordUpdaterType: StorageServiceRecordUpdater typealias IdType = RecordUpdaterType.IdType typealias RecordType = RecordUpdaterType.RecordType typealias State = StorageServiceOperation.State var recordUpdater: RecordUpdaterType { get } func changeState(for localId: IdType, in state: State) -> State.ChangeState? func setChangeState(_ changeState: State.ChangeState?, for localId: IdType, in state: inout State) func resetAndEnumerateChangeStates(in state: inout State, block: (inout State, IdType, State.ChangeState) -> Void) func storageIdentifier(for localId: IdType, in state: State) -> StorageService.StorageIdentifier? func setStorageIdentifier(_ storageIdentifier: StorageService.StorageIdentifier?, for localId: IdType, in state: inout State) func recordWithUnknownFields(for localId: IdType, in state: State) -> RecordType? func setRecordWithUnknownFields(_ recordWithUnknownFields: RecordType?, for localId: IdType, in state: inout State) func recordsWithUnknownFields(in state: State) -> [(IdType, RecordType)] } private struct SingleElementStateUpdater: StorageServiceStateUpdater where RecordUpdaterType.IdType == Void { typealias IdType = RecordUpdaterType.IdType typealias RecordType = RecordUpdaterType.RecordType typealias State = StorageServiceOperation.State let recordUpdater: RecordUpdaterType private let changeStateKeyPath: WritableKeyPath private let storageIdentifierKeyPath: WritableKeyPath private let recordWithUnknownFieldsKeyPath: WritableKeyPath init( recordUpdater: RecordUpdaterType, changeState: WritableKeyPath, storageIdentifier: WritableKeyPath, recordWithUnknownFields: WritableKeyPath, ) { self.recordUpdater = recordUpdater self.changeStateKeyPath = changeState self.storageIdentifierKeyPath = storageIdentifier self.recordWithUnknownFieldsKeyPath = recordWithUnknownFields } func changeState(for localId: IdType, in state: State) -> State.ChangeState? { state[keyPath: changeStateKeyPath] } func setChangeState(_ changeState: State.ChangeState?, for localId: IdType, in state: inout State) { state[keyPath: changeStateKeyPath] = changeState ?? .unchanged } func resetAndEnumerateChangeStates(in state: inout State, block: (inout State, IdType, State.ChangeState) -> Void) { let oldState = state[keyPath: changeStateKeyPath] state[keyPath: changeStateKeyPath] = .unchanged block(&state, (), oldState) } func storageIdentifier(for localId: IdType, in state: State) -> StorageService.StorageIdentifier? { state[keyPath: storageIdentifierKeyPath] } func setStorageIdentifier(_ storageIdentifier: StorageService.StorageIdentifier?, for localId: IdType, in state: inout State) { state[keyPath: storageIdentifierKeyPath] = storageIdentifier } func recordWithUnknownFields(for localId: IdType, in state: State) -> RecordType? { state[keyPath: recordWithUnknownFieldsKeyPath] } func setRecordWithUnknownFields(_ recordWithUnknownFields: RecordType?, for localId: IdType, in state: inout State) { state[keyPath: recordWithUnknownFieldsKeyPath] = recordWithUnknownFields } func recordsWithUnknownFields(in state: State) -> [(IdType, RecordType)] { guard let recordWithUnknownFields = state[keyPath: recordWithUnknownFieldsKeyPath] else { return [] } return [((), recordWithUnknownFields)] } } private struct MultipleElementStateUpdater: StorageServiceStateUpdater where RecordUpdaterType.IdType: Hashable { typealias IdType = RecordUpdaterType.IdType typealias RecordType = RecordUpdaterType.RecordType typealias State = StorageServiceOperation.State let recordUpdater: RecordUpdaterType private let changeStateKeyPath: WritableKeyPath private let storageIdentifierKeyPath: WritableKeyPath private let recordWithUnknownFieldsKeyPath: WritableKeyPath init( recordUpdater: RecordUpdaterType, changeState: WritableKeyPath, storageIdentifier: WritableKeyPath, recordWithUnknownFields: WritableKeyPath, ) { self.recordUpdater = recordUpdater self.changeStateKeyPath = changeState self.storageIdentifierKeyPath = storageIdentifier self.recordWithUnknownFieldsKeyPath = recordWithUnknownFields } func changeState(for localId: IdType, in state: State) -> State.ChangeState? { state[keyPath: changeStateKeyPath][localId] } func setChangeState(_ changeState: State.ChangeState?, for localId: IdType, in state: inout State) { state[keyPath: changeStateKeyPath][localId] = changeState } func resetAndEnumerateChangeStates(in state: inout State, block: (inout State, IdType, State.ChangeState) -> Void) { let oldValue = state[keyPath: changeStateKeyPath] state[keyPath: changeStateKeyPath] = [:] for (localId, changeState) in oldValue { block(&state, localId, changeState) } } func storageIdentifier(for localId: IdType, in state: State) -> StorageService.StorageIdentifier? { state[keyPath: storageIdentifierKeyPath][localId] } func setStorageIdentifier(_ storageIdentifier: StorageService.StorageIdentifier?, for localId: IdType, in state: inout State) { state[keyPath: storageIdentifierKeyPath][localId] = storageIdentifier } func recordWithUnknownFields(for localId: IdType, in state: State) -> RecordType? { state[keyPath: recordWithUnknownFieldsKeyPath][localId] } func setRecordWithUnknownFields(_ recordWithUnknownFields: RecordType?, for localId: IdType, in state: inout State) { state[keyPath: recordWithUnknownFieldsKeyPath][localId] = recordWithUnknownFields } func recordsWithUnknownFields(in state: State) -> [(IdType, RecordType)] { state[keyPath: recordWithUnknownFieldsKeyPath].map { $0 } } } // MARK: - Legacy Codable extension Dictionary: EmptyInitializable {} /// Optionally attempts decoding a dictionary as a BidirectionalDictionary, /// in case it was previously stored in that format. @propertyWrapper private struct BidirectionalLegacyDecoding: Codable { enum BidirectionalDictionaryCodingKeys: String, CodingKey { case forwardDictionary case backwardDictionary } var wrappedValue: Value init(wrappedValue: Value) { self.wrappedValue = wrappedValue } init(from decoder: Swift.Decoder) throws { do { // First, try and decode as if we're just a dictionary. wrappedValue = try Value(from: decoder) } catch DecodingError.keyNotFound, DecodingError.typeMismatch { // If we hit a decoding error, try and decode as if // we were a BidirectionalDictionary. let bidirectionalContainer = try decoder.container(keyedBy: BidirectionalDictionaryCodingKeys.self) wrappedValue = try bidirectionalContainer.decode(Value.self, forKey: .forwardDictionary) } } func encode(to encoder: Encoder) throws { try wrappedValue.encode(to: encoder) } } // MARK: - StorageServiceProtoManifestRecord private extension StorageServiceProtoManifestRecord { var logDescription: String { "v[\(version)].\(sourceDevice)" } }