diff --git a/apps/ios/SimpleX NSE/ConcurrentQueue.swift b/apps/ios/SimpleX NSE/ConcurrentQueue.swift deleted file mode 100644 index 274a683c00..0000000000 --- a/apps/ios/SimpleX NSE/ConcurrentQueue.swift +++ /dev/null @@ -1,64 +0,0 @@ -// -// ConcurrentQueue.swift -// SimpleX NSE -// -// Created by Evgeny on 08/12/2023. -// Copyright © 2023 SimpleX Chat. All rights reserved. -// - -import Foundation - -struct DequeueElement { - var elementId: UUID? - var task: Task -} - -class ConcurrentQueue { - private var queue: [T] = [] - private var queueLock = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.concurrent-queue.lock.\(UUID())") - private var continuations = [(elementId: UUID, continuation: CheckedContinuation)]() - - func enqueue(_ el: T) { - resumeContinuation(el) { self.queue.append(el) } - } - - func frontEnqueue(_ el: T) { - resumeContinuation(el) { self.queue.insert(el, at: 0) } - } - - private func resumeContinuation(_ el: T, add: @escaping () -> Void) { - queueLock.sync { - if let (_, cont) = continuations.first { - continuations.remove(at: 0) - cont.resume(returning: el) - } else { - add() - } - } - } - - func dequeue() -> DequeueElement { - queueLock.sync { - if queue.isEmpty { - let elementId = UUID() - let task = Task { - await withCheckedContinuation { cont in - continuations.append((elementId, cont)) - } - } - return DequeueElement(elementId: elementId, task: task) - } else { - let el = queue.remove(at: 0) - return DequeueElement(task: Task { el }) - } - } - } - - func cancelDequeue(_ elementId: UUID) { - queueLock.sync { - let cancelled = continuations.filter { $0.elementId == elementId } - continuations.removeAll { $0.elementId == elementId } - cancelled.forEach { $0.continuation.resume(returning: nil) } - } - } -} diff --git a/apps/ios/SimpleX NSE/NotificationService.swift b/apps/ios/SimpleX NSE/NotificationService.swift index 6f76781837..3da4775382 100644 --- a/apps/ios/SimpleX NSE/NotificationService.swift +++ b/apps/ios/SimpleX NSE/NotificationService.swift @@ -22,110 +22,6 @@ let nseSuspendSchedule: SuspendSchedule = (2, 4) let fastNSESuspendSchedule: SuspendSchedule = (1, 1) -typealias NtfStream = ConcurrentQueue - -// Notifications are delivered via concurrent queues, as they are all received from chat controller in a single loop that -// writes to ConcurrentQueue and when notification is processed, the instance of Notification service extension reads from the queue. -// One queue per connection (entity) is used. -// The concurrent queues allow read cancellation, to ensure that notifications are not lost in case the current thread completes -// before expected notification is read (multiple notifications can be expected, because one notification can be delivered for several messages). -actor PendingNtfs { - static let shared = PendingNtfs() - private var ntfStreams: [String: NtfStream] = [:] - - func createStream(_ id: String) async { - logger.debug("NotificationService PendingNtfs.createStream: \(id)") - if ntfStreams[id] == nil { - ntfStreams[id] = ConcurrentQueue() - logger.debug("NotificationService PendingNtfs.createStream: created ConcurrentQueue") - } - } - - func readStream(_ id: String, for nse: NotificationService, ntfInfo: NtfMessages) async { - logger.debug("NotificationService PendingNtfs.readStream: \(id) \(ntfInfo.ntfMessages.count)") - if !ntfInfo.user.showNotifications { - nse.setBestAttemptNtf(.empty) - } - if let s = ntfStreams[id] { - logger.debug("NotificationService PendingNtfs.readStream: has stream") - var expected = Set(ntfInfo.ntfMessages.map { $0.msgId }) - logger.debug("NotificationService PendingNtfs.readStream: expecting: \(expected)") - var readCancelled = false - var dequeued: DequeueElement? - nse.cancelRead = { - readCancelled = true - if let elementId = dequeued?.elementId { - s.cancelDequeue(elementId) - } - } - while !readCancelled { - dequeued = s.dequeue() - if let ntf = await dequeued?.task.value { - if readCancelled { - logger.debug("NotificationService PendingNtfs.readStream: read cancelled, put ntf to queue front") - s.frontEnqueue(ntf) - break - } else if case let .msgInfo(info) = ntf { - let found = expected.remove(info.msgId) - if found != nil { - logger.debug("NotificationService PendingNtfs.readStream: msgInfo, last: \(expected.isEmpty)") - if expected.isEmpty { break } - } else if let msgTs = ntfInfo.msgTs, info.msgTs > msgTs { - logger.debug("NotificationService PendingNtfs.readStream: unexpected msgInfo") - s.frontEnqueue(ntf) - break - } - } else if ntfInfo.user.showNotifications { - logger.debug("NotificationService PendingNtfs.readStream: setting best attempt") - nse.setBestAttemptNtf(ntf) - if ntf.isCallInvitation { break } - } - } else { - break - } - } - nse.cancelRead = nil - logger.debug("NotificationService PendingNtfs.readStream: exiting") - } - } - - func writeStream(_ id: String, _ ntf: NSENotification) async { - logger.debug("NotificationService PendingNtfs.writeStream: \(id)") - if let s = ntfStreams[id] { - logger.debug("NotificationService PendingNtfs.writeStream: writing ntf") - s.enqueue(ntf) - } - } -} - -// The current implementation assumes concurrent notification delivery and uses semaphores -// to process only one notification per connection (entity) at a time. -class NtfStreamSemaphores { - static let shared = NtfStreamSemaphores() - private static let queue = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.notification-semaphores.lock") - private var semaphores: [String: DispatchSemaphore] = [:] - - func waitForStream(_ id: String) { - streamSemaphore(id, value: 0)?.wait() - } - - func signalStreamReady(_ id: String) { - streamSemaphore(id, value: 1)?.signal() - } - - // this function returns nil if semaphore is just created, so passed value shoud be coordinated with the desired end value of the semaphore - private func streamSemaphore(_ id: String, value: Int) -> DispatchSemaphore? { - NtfStreamSemaphores.queue.sync { - if let s = semaphores[id] { - return s - } else { - semaphores[id] = DispatchSemaphore(value: value) - return nil - } - } - } -} - enum NSENotification { case nse(UNMutableNotificationContent) case callkit(RcvCallInvitation) @@ -149,7 +45,7 @@ class NSEThreads { static let shared = NSEThreads() private static let queue = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.notification-threads.lock") private var allThreads: Set = [] - private var activeThreads: Set = [] + private var activeThreads: [(UUID, NotificationService)] = [] func newThread() -> UUID { NSEThreads.queue.sync { @@ -158,19 +54,38 @@ class NSEThreads { } } - func startThread(_ t: UUID) { + func startThread(_ t: UUID, _ service: NotificationService) { NSEThreads.queue.sync { if allThreads.contains(t) { - _ = activeThreads.insert(t) + activeThreads.append((t, service)) } else { logger.warning("NotificationService startThread: thread \(t) was removed before it started") } } } + func processNotification(_ id: ChatId, _ ntf: NSENotification) async -> Void { + var timeoutOfWaiting: Int64 = 5_000_000000 + while timeoutOfWaiting > 0 { + let activeThread = NSEThreads.queue.sync { + activeThreads.first(where: { (_, nse) in nse.receiveEntityId == id }) + } + if activeThread?.1.processReceivedNtf?(ntf) == true { + break + } else { + try? await Task.sleep(nanoseconds: 10_000000) + timeoutOfWaiting -= 10_000000 + } + } + } + func endThread(_ t: UUID) -> Bool { NSEThreads.queue.sync { - let tActive = activeThreads.remove(t) + let tActive: UUID? = if let index = activeThreads.firstIndex(where: { $0.0 == t }) { + activeThreads.remove(at: index).0 + } else { + nil + } let t = allThreads.remove(t) if tActive != nil && activeThreads.isEmpty { return true @@ -199,7 +114,8 @@ class NotificationService: UNNotificationServiceExtension { // chat does not need to be suspended but NSE state still needs to be set to "suspended". var threadId: UUID? = NSEThreads.shared.newThread() var receiveEntityId: String? - var cancelRead: (() -> Void)? + // return true if the message is taken - it prevents sending it to another NotificationService instance for processing + var processReceivedNtf: ((NSENotification) -> Bool)? var appSubscriber: AppSubscriber? var returnedSuspension = false @@ -265,7 +181,7 @@ class NotificationService: UNNotificationServiceExtension { // check it here again appStateGroupDefault.get().inactive { // thread is added to activeThreads tracking set here - if thread started chat it needs to be suspended - if let t = threadId { NSEThreads.shared.startThread(t) } + if let t = threadId { NSEThreads.shared.startThread(t, self) } let dbStatus = startChat() if case .ok = dbStatus, let ntfInfo = apiGetNtfMessage(nonce: nonce, encNtfInfo: encNtfInfo) { @@ -276,17 +192,39 @@ class NotificationService: UNNotificationServiceExtension { ? .nse(createConnectionEventNtf(ntfInfo.user, connEntity)) : .empty ) - if let id = connEntity.id { - receiveEntityId = id - NtfStreamSemaphores.shared.waitForStream(id) - if receiveEntityId != nil { - Task { - logger.debug("NotificationService: receiveNtfMessages: in Task, connEntity id \(id)") - await PendingNtfs.shared.createStream(id) - await PendingNtfs.shared.readStream(id, for: self, ntfInfo: ntfInfo) - deliverBestAttemptNtf() + if let id = connEntity.id, let msgTs = ntfInfo.msgTs { + var expected = Set(ntfInfo.ntfMessages.map { $0.msgId }) + processReceivedNtf = { ntf in + if !ntfInfo.user.showNotifications { + self.setBestAttemptNtf(.empty) } + if case let .msgInfo(info) = ntf { + let found = expected.remove(info.msgId) + if found != nil { + logger.debug("NotificationService processNtf: msgInfo, last: \(expected.isEmpty)") + if expected.isEmpty { + self.deliverBestAttemptNtf() + } + return true + } else if info.msgTs > msgTs { + logger.debug("NotificationService processNtf: unexpected msgInfo, let other instance to process it, stopping this one") + self.deliverBestAttemptNtf() + return false + } else { + logger.debug("NotificationService processNtf: unknown message, let other instance to process it") + return false + } + } else if ntfInfo.user.showNotifications { + logger.debug("NotificationService processNtf: setting best attempt") + self.setBestAttemptNtf(ntf) + if ntf.isCallInvitation { + self.deliverBestAttemptNtf() + } + return true + } + return false } + receiveEntityId = id return } } @@ -323,14 +261,9 @@ class NotificationService: UNNotificationServiceExtension { private func deliverBestAttemptNtf(urgent: Bool = false) { logger.debug("NotificationService.deliverBestAttemptNtf") - if let cancel = cancelRead { - cancelRead = nil - cancel() - } - if let id = receiveEntityId { - receiveEntityId = nil - NtfStreamSemaphores.shared.signalStreamReady(id) - } + // stop processing other messages + processReceivedNtf = nil + let suspend: Bool if let t = threadId { threadId = nil @@ -572,7 +505,7 @@ func chatSuspended() { } // A single loop is used per Notification service extension process to receive and process all messages depending on the NSE state -// If the extension is not active yet, or suspended/suspending, or the app is running, the notifications will no be received. +// If the extension is not active yet, or suspended/suspending, or the app is running, the notifications will not be received. func receiveMessages() async { logger.debug("NotificationService receiveMessages") while true { @@ -591,8 +524,7 @@ func receiveMessages() async { logger.debug("NotificationService receiveMsg: message") if let (id, ntf) = await receivedMsgNtf(msg) { logger.debug("NotificationService receiveMsg: notification") - await PendingNtfs.shared.createStream(id) - await PendingNtfs.shared.writeStream(id, ntf) + await NSEThreads.shared.processNotification(id, ntf) } } } diff --git a/apps/ios/SimpleX.xcodeproj/project.pbxproj b/apps/ios/SimpleX.xcodeproj/project.pbxproj index 9e43249650..b6578e659b 100644 --- a/apps/ios/SimpleX.xcodeproj/project.pbxproj +++ b/apps/ios/SimpleX.xcodeproj/project.pbxproj @@ -144,7 +144,6 @@ 5CEACCED27DEA495000BD591 /* MsgContentView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CEACCEC27DEA495000BD591 /* MsgContentView.swift */; }; 5CEBD7462A5C0A8F00665FE2 /* KeyboardPadding.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CEBD7452A5C0A8F00665FE2 /* KeyboardPadding.swift */; }; 5CEBD7482A5F115D00665FE2 /* SetDeliveryReceiptsView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CEBD7472A5F115D00665FE2 /* SetDeliveryReceiptsView.swift */; }; - 5CF9371E2B23429500E1D781 /* ConcurrentQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CF9371D2B23429500E1D781 /* ConcurrentQueue.swift */; }; 5CF937202B24DE8C00E1D781 /* SharedFileSubscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CF9371F2B24DE8C00E1D781 /* SharedFileSubscriber.swift */; }; 5CF937232B2503D000E1D781 /* NSESubscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CF937212B25034A00E1D781 /* NSESubscriber.swift */; }; 5CFA59C42860BC6200863A68 /* MigrateToAppGroupView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CFA59C32860BC6200863A68 /* MigrateToAppGroupView.swift */; }; @@ -437,7 +436,6 @@ 5CEACCEC27DEA495000BD591 /* MsgContentView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MsgContentView.swift; sourceTree = ""; }; 5CEBD7452A5C0A8F00665FE2 /* KeyboardPadding.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = KeyboardPadding.swift; sourceTree = ""; }; 5CEBD7472A5F115D00665FE2 /* SetDeliveryReceiptsView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SetDeliveryReceiptsView.swift; sourceTree = ""; }; - 5CF9371D2B23429500E1D781 /* ConcurrentQueue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConcurrentQueue.swift; sourceTree = ""; }; 5CF9371F2B24DE8C00E1D781 /* SharedFileSubscriber.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SharedFileSubscriber.swift; sourceTree = ""; }; 5CF937212B25034A00E1D781 /* NSESubscriber.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NSESubscriber.swift; sourceTree = ""; }; 5CFA59C32860BC6200863A68 /* MigrateToAppGroupView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MigrateToAppGroupView.swift; sourceTree = ""; }; @@ -800,7 +798,6 @@ isa = PBXGroup; children = ( 5CDCAD5128186DE400503DA2 /* SimpleX NSE.entitlements */, - 5CF9371D2B23429500E1D781 /* ConcurrentQueue.swift */, 5CDCAD472818589900503DA2 /* NotificationService.swift */, 5CDCAD492818589900503DA2 /* Info.plist */, 5CB0BA862826CB3A00B3292C /* InfoPlist.strings */, @@ -1285,7 +1282,6 @@ files = ( 5CDCAD482818589900503DA2 /* NotificationService.swift in Sources */, 5CFE0922282EEAF60002594B /* ZoomableScrollView.swift in Sources */, - 5CF9371E2B23429500E1D781 /* ConcurrentQueue.swift in Sources */, ); runOnlyForDeploymentPostprocessing = 0; }; diff --git a/apps/ios/SimpleXChat/ChatTypes.swift b/apps/ios/SimpleXChat/ChatTypes.swift index b74a2517c7..5e1c0ac538 100644 --- a/apps/ios/SimpleXChat/ChatTypes.swift +++ b/apps/ios/SimpleXChat/ChatTypes.swift @@ -2120,7 +2120,7 @@ public enum ConnectionEntity: Decodable { public var id: String? { switch self { case let .rcvDirectMsgConnection(contact): - return contact?.id ?? nil + return contact?.id case let .rcvGroupMsgConnection(_, groupMember): return groupMember.id case let .userContactConnection(userContact):