// // Copyright 2025 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only // import Foundation struct ConnectionLock { /// Byte offsets in this file are locked as follows: /// - Byte 0: The connection lock. /// - Bytes 1...(N-1): A lock indicating the process with priority N wants /// the connection lock. (The byte for the lowest priority process (i.e., /// priority == priorityCount) isn't locked because it's not contested.) private let fileLock: Result private let priority: Int private let priorityCount: Int init(filePath: String, priority: Int, of priorityCount: Int) { self.fileLock = Result(catching: { () throws(POSIXError) in return try FileLock(filePath: filePath) }) self.priority = priority self.priorityCount = priorityCount } func close() { try? self.fileLock.get().close() } struct HeldLock { fileprivate var observerToken: DarwinNotificationCenter.ObserverToken? } /// Acquires a cross-process lock for this process. /// /// When a more important (i.e., lower priority number) process requests a /// lock, less important processes are notified (via `onInterrupt`) and are /// expected to quickly release the lock. /// /// - Throws: A POSIXError or a CancellationError func lock(onInterrupt: (queue: DispatchQueue, callback: () -> Void)) async throws -> HeldLock { let fileLock = try self.fileLock.get() var observerToken: DarwinNotificationCenter.ObserverToken? defer { if let observerToken { DarwinNotificationCenter.removeObserver(observerToken) } } // If we're not the most important, listen for interruptions & make sure // we're not racing with a more important process. if self.priority > 1 { observerToken = DarwinNotificationCenter.addObserver( name: .connectionLock(for: self.priority), queue: onInterrupt.queue, block: { _ in onInterrupt.callback() }, ) // More important processes hold this lock from BEFORE they post a // notification until AFTER they've acquired the connection lock. By // immediately locking & unlocking, we either run before they acquire this // lock (and will observe the notification they send) or strictly after // they acquire this lock (and will fail to acquire the connection lock). try await fileLock.lockWithCancellationHandler(range: 1.. 0 else { throw errorForErrno() } self.fd = result } func close() { _ = Darwin.close(self.fd) } /// See `man fcntl`. private func fcntl(range: Range, shouldLock: Bool, shouldBlock: Bool) -> Result { owsPrecondition(shouldLock || !shouldBlock) var req = flock() req.l_start = off_t(range.lowerBound) req.l_len = off_t(range.upperBound - range.lowerBound) req.l_whence = Int16(SEEK_SET) req.l_type = Int16(shouldLock ? F_WRLCK : F_UNLCK) let result = Darwin.fcntl(self.fd, shouldBlock ? F_SETLKW : F_SETLK, &req) if result == -1 { return .failure(errorForErrno()) } return .success(()) } func lock(at offset: Int) async throws(POSIXError) { try await self.lock(range: offset..<(offset + 1)) } func lock(range: Range) async throws(POSIXError) { try await withCheckedContinuation { (continuation: CheckedContinuation, Never>) in // We're going to block, so don't block the cooperative thread pool. DispatchQueue.global().async { continuation.resume(returning: self.fcntl(range: range, shouldLock: true, shouldBlock: true)) } }.get() } func lockWithCancellationHandler(at offset: Int, maxAveragePollingInterval: TimeInterval = 3) async throws { try await self.lockWithCancellationHandler(range: offset..<(offset + 1), maxAveragePollingInterval: maxAveragePollingInterval) } func lockWithCancellationHandler(range: Range, maxAveragePollingInterval: TimeInterval = 3) async throws { // In most cases, we don't expect contention. But it is possible, and we // want to remain cancellable during times of contention, so we poll. try await Retry.performWithBackoff( maxAttempts: .max, minAverageBackoff: 0.1, maxAverageBackoff: maxAveragePollingInterval, isRetryable: { $0.code == .EAGAIN }, block: { () throws(POSIXError) in try self.tryLock(range: range) }, ) } func tryLock(at offset: Int) throws(POSIXError) { try self.tryLock(range: offset..<(offset + 1)) } func tryLock(range: Range) throws(POSIXError) { try self.fcntl(range: range, shouldLock: true, shouldBlock: false).get() } func unlock(at offset: Int) throws(POSIXError) { try self.unlock(range: offset..<(offset + 1)) } func unlock(range: Range) throws(POSIXError) { try self.fcntl(range: range, shouldLock: false, shouldBlock: false).get() } } private func errorForErrno() -> POSIXError { return POSIXError(POSIXErrorCode(rawValue: errno)!) }