ios: NotificationService refactoring (#3961)

* ios: NotificationService refactoring

* refactor

* removed unused code

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
Stanislav Dmitrenko
2024-03-29 00:28:10 +07:00
committed by GitHub
parent 366b58ead5
commit b64ae90850
4 changed files with 63 additions and 199 deletions

View File

@@ -1,64 +0,0 @@
//
// ConcurrentQueue.swift
// SimpleX NSE
//
// Created by Evgeny on 08/12/2023.
// Copyright © 2023 SimpleX Chat. All rights reserved.
//
import Foundation
struct DequeueElement<T> {
var elementId: UUID?
var task: Task<T?, Never>
}
class ConcurrentQueue<T> {
private var queue: [T] = []
private var queueLock = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.concurrent-queue.lock.\(UUID())")
private var continuations = [(elementId: UUID, continuation: CheckedContinuation<T?, Never>)]()
func enqueue(_ el: T) {
resumeContinuation(el) { self.queue.append(el) }
}
func frontEnqueue(_ el: T) {
resumeContinuation(el) { self.queue.insert(el, at: 0) }
}
private func resumeContinuation(_ el: T, add: @escaping () -> Void) {
queueLock.sync {
if let (_, cont) = continuations.first {
continuations.remove(at: 0)
cont.resume(returning: el)
} else {
add()
}
}
}
func dequeue() -> DequeueElement<T> {
queueLock.sync {
if queue.isEmpty {
let elementId = UUID()
let task = Task {
await withCheckedContinuation { cont in
continuations.append((elementId, cont))
}
}
return DequeueElement(elementId: elementId, task: task)
} else {
let el = queue.remove(at: 0)
return DequeueElement(task: Task { el })
}
}
}
func cancelDequeue(_ elementId: UUID) {
queueLock.sync {
let cancelled = continuations.filter { $0.elementId == elementId }
continuations.removeAll { $0.elementId == elementId }
cancelled.forEach { $0.continuation.resume(returning: nil) }
}
}
}

View File

@@ -22,110 +22,6 @@ let nseSuspendSchedule: SuspendSchedule = (2, 4)
let fastNSESuspendSchedule: SuspendSchedule = (1, 1)
typealias NtfStream = ConcurrentQueue<NSENotification>
// Notifications are delivered via concurrent queues, as they are all received from chat controller in a single loop that
// writes to ConcurrentQueue and when notification is processed, the instance of Notification service extension reads from the queue.
// One queue per connection (entity) is used.
// The concurrent queues allow read cancellation, to ensure that notifications are not lost in case the current thread completes
// before expected notification is read (multiple notifications can be expected, because one notification can be delivered for several messages).
actor PendingNtfs {
static let shared = PendingNtfs()
private var ntfStreams: [String: NtfStream] = [:]
func createStream(_ id: String) async {
logger.debug("NotificationService PendingNtfs.createStream: \(id)")
if ntfStreams[id] == nil {
ntfStreams[id] = ConcurrentQueue()
logger.debug("NotificationService PendingNtfs.createStream: created ConcurrentQueue")
}
}
func readStream(_ id: String, for nse: NotificationService, ntfInfo: NtfMessages) async {
logger.debug("NotificationService PendingNtfs.readStream: \(id) \(ntfInfo.ntfMessages.count)")
if !ntfInfo.user.showNotifications {
nse.setBestAttemptNtf(.empty)
}
if let s = ntfStreams[id] {
logger.debug("NotificationService PendingNtfs.readStream: has stream")
var expected = Set(ntfInfo.ntfMessages.map { $0.msgId })
logger.debug("NotificationService PendingNtfs.readStream: expecting: \(expected)")
var readCancelled = false
var dequeued: DequeueElement<NSENotification>?
nse.cancelRead = {
readCancelled = true
if let elementId = dequeued?.elementId {
s.cancelDequeue(elementId)
}
}
while !readCancelled {
dequeued = s.dequeue()
if let ntf = await dequeued?.task.value {
if readCancelled {
logger.debug("NotificationService PendingNtfs.readStream: read cancelled, put ntf to queue front")
s.frontEnqueue(ntf)
break
} else if case let .msgInfo(info) = ntf {
let found = expected.remove(info.msgId)
if found != nil {
logger.debug("NotificationService PendingNtfs.readStream: msgInfo, last: \(expected.isEmpty)")
if expected.isEmpty { break }
} else if let msgTs = ntfInfo.msgTs, info.msgTs > msgTs {
logger.debug("NotificationService PendingNtfs.readStream: unexpected msgInfo")
s.frontEnqueue(ntf)
break
}
} else if ntfInfo.user.showNotifications {
logger.debug("NotificationService PendingNtfs.readStream: setting best attempt")
nse.setBestAttemptNtf(ntf)
if ntf.isCallInvitation { break }
}
} else {
break
}
}
nse.cancelRead = nil
logger.debug("NotificationService PendingNtfs.readStream: exiting")
}
}
func writeStream(_ id: String, _ ntf: NSENotification) async {
logger.debug("NotificationService PendingNtfs.writeStream: \(id)")
if let s = ntfStreams[id] {
logger.debug("NotificationService PendingNtfs.writeStream: writing ntf")
s.enqueue(ntf)
}
}
}
// The current implementation assumes concurrent notification delivery and uses semaphores
// to process only one notification per connection (entity) at a time.
class NtfStreamSemaphores {
static let shared = NtfStreamSemaphores()
private static let queue = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.notification-semaphores.lock")
private var semaphores: [String: DispatchSemaphore] = [:]
func waitForStream(_ id: String) {
streamSemaphore(id, value: 0)?.wait()
}
func signalStreamReady(_ id: String) {
streamSemaphore(id, value: 1)?.signal()
}
// this function returns nil if semaphore is just created, so passed value shoud be coordinated with the desired end value of the semaphore
private func streamSemaphore(_ id: String, value: Int) -> DispatchSemaphore? {
NtfStreamSemaphores.queue.sync {
if let s = semaphores[id] {
return s
} else {
semaphores[id] = DispatchSemaphore(value: value)
return nil
}
}
}
}
enum NSENotification {
case nse(UNMutableNotificationContent)
case callkit(RcvCallInvitation)
@@ -149,7 +45,7 @@ class NSEThreads {
static let shared = NSEThreads()
private static let queue = DispatchQueue(label: "chat.simplex.app.SimpleX-NSE.notification-threads.lock")
private var allThreads: Set<UUID> = []
private var activeThreads: Set<UUID> = []
private var activeThreads: [(UUID, NotificationService)] = []
func newThread() -> UUID {
NSEThreads.queue.sync {
@@ -158,19 +54,38 @@ class NSEThreads {
}
}
func startThread(_ t: UUID) {
func startThread(_ t: UUID, _ service: NotificationService) {
NSEThreads.queue.sync {
if allThreads.contains(t) {
_ = activeThreads.insert(t)
activeThreads.append((t, service))
} else {
logger.warning("NotificationService startThread: thread \(t) was removed before it started")
}
}
}
func processNotification(_ id: ChatId, _ ntf: NSENotification) async -> Void {
var timeoutOfWaiting: Int64 = 5_000_000000
while timeoutOfWaiting > 0 {
let activeThread = NSEThreads.queue.sync {
activeThreads.first(where: { (_, nse) in nse.receiveEntityId == id })
}
if activeThread?.1.processReceivedNtf?(ntf) == true {
break
} else {
try? await Task.sleep(nanoseconds: 10_000000)
timeoutOfWaiting -= 10_000000
}
}
}
func endThread(_ t: UUID) -> Bool {
NSEThreads.queue.sync {
let tActive = activeThreads.remove(t)
let tActive: UUID? = if let index = activeThreads.firstIndex(where: { $0.0 == t }) {
activeThreads.remove(at: index).0
} else {
nil
}
let t = allThreads.remove(t)
if tActive != nil && activeThreads.isEmpty {
return true
@@ -199,7 +114,8 @@ class NotificationService: UNNotificationServiceExtension {
// chat does not need to be suspended but NSE state still needs to be set to "suspended".
var threadId: UUID? = NSEThreads.shared.newThread()
var receiveEntityId: String?
var cancelRead: (() -> Void)?
// return true if the message is taken - it prevents sending it to another NotificationService instance for processing
var processReceivedNtf: ((NSENotification) -> Bool)?
var appSubscriber: AppSubscriber?
var returnedSuspension = false
@@ -265,7 +181,7 @@ class NotificationService: UNNotificationServiceExtension {
// check it here again
appStateGroupDefault.get().inactive {
// thread is added to activeThreads tracking set here - if thread started chat it needs to be suspended
if let t = threadId { NSEThreads.shared.startThread(t) }
if let t = threadId { NSEThreads.shared.startThread(t, self) }
let dbStatus = startChat()
if case .ok = dbStatus,
let ntfInfo = apiGetNtfMessage(nonce: nonce, encNtfInfo: encNtfInfo) {
@@ -276,17 +192,39 @@ class NotificationService: UNNotificationServiceExtension {
? .nse(createConnectionEventNtf(ntfInfo.user, connEntity))
: .empty
)
if let id = connEntity.id {
receiveEntityId = id
NtfStreamSemaphores.shared.waitForStream(id)
if receiveEntityId != nil {
Task {
logger.debug("NotificationService: receiveNtfMessages: in Task, connEntity id \(id)")
await PendingNtfs.shared.createStream(id)
await PendingNtfs.shared.readStream(id, for: self, ntfInfo: ntfInfo)
deliverBestAttemptNtf()
if let id = connEntity.id, let msgTs = ntfInfo.msgTs {
var expected = Set(ntfInfo.ntfMessages.map { $0.msgId })
processReceivedNtf = { ntf in
if !ntfInfo.user.showNotifications {
self.setBestAttemptNtf(.empty)
}
if case let .msgInfo(info) = ntf {
let found = expected.remove(info.msgId)
if found != nil {
logger.debug("NotificationService processNtf: msgInfo, last: \(expected.isEmpty)")
if expected.isEmpty {
self.deliverBestAttemptNtf()
}
return true
} else if info.msgTs > msgTs {
logger.debug("NotificationService processNtf: unexpected msgInfo, let other instance to process it, stopping this one")
self.deliverBestAttemptNtf()
return false
} else {
logger.debug("NotificationService processNtf: unknown message, let other instance to process it")
return false
}
} else if ntfInfo.user.showNotifications {
logger.debug("NotificationService processNtf: setting best attempt")
self.setBestAttemptNtf(ntf)
if ntf.isCallInvitation {
self.deliverBestAttemptNtf()
}
return true
}
return false
}
receiveEntityId = id
return
}
}
@@ -323,14 +261,9 @@ class NotificationService: UNNotificationServiceExtension {
private func deliverBestAttemptNtf(urgent: Bool = false) {
logger.debug("NotificationService.deliverBestAttemptNtf")
if let cancel = cancelRead {
cancelRead = nil
cancel()
}
if let id = receiveEntityId {
receiveEntityId = nil
NtfStreamSemaphores.shared.signalStreamReady(id)
}
// stop processing other messages
processReceivedNtf = nil
let suspend: Bool
if let t = threadId {
threadId = nil
@@ -572,7 +505,7 @@ func chatSuspended() {
}
// A single loop is used per Notification service extension process to receive and process all messages depending on the NSE state
// If the extension is not active yet, or suspended/suspending, or the app is running, the notifications will no be received.
// If the extension is not active yet, or suspended/suspending, or the app is running, the notifications will not be received.
func receiveMessages() async {
logger.debug("NotificationService receiveMessages")
while true {
@@ -591,8 +524,7 @@ func receiveMessages() async {
logger.debug("NotificationService receiveMsg: message")
if let (id, ntf) = await receivedMsgNtf(msg) {
logger.debug("NotificationService receiveMsg: notification")
await PendingNtfs.shared.createStream(id)
await PendingNtfs.shared.writeStream(id, ntf)
await NSEThreads.shared.processNotification(id, ntf)
}
}
}

View File

@@ -144,7 +144,6 @@
5CEACCED27DEA495000BD591 /* MsgContentView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CEACCEC27DEA495000BD591 /* MsgContentView.swift */; };
5CEBD7462A5C0A8F00665FE2 /* KeyboardPadding.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CEBD7452A5C0A8F00665FE2 /* KeyboardPadding.swift */; };
5CEBD7482A5F115D00665FE2 /* SetDeliveryReceiptsView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CEBD7472A5F115D00665FE2 /* SetDeliveryReceiptsView.swift */; };
5CF9371E2B23429500E1D781 /* ConcurrentQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CF9371D2B23429500E1D781 /* ConcurrentQueue.swift */; };
5CF937202B24DE8C00E1D781 /* SharedFileSubscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CF9371F2B24DE8C00E1D781 /* SharedFileSubscriber.swift */; };
5CF937232B2503D000E1D781 /* NSESubscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CF937212B25034A00E1D781 /* NSESubscriber.swift */; };
5CFA59C42860BC6200863A68 /* MigrateToAppGroupView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5CFA59C32860BC6200863A68 /* MigrateToAppGroupView.swift */; };
@@ -437,7 +436,6 @@
5CEACCEC27DEA495000BD591 /* MsgContentView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MsgContentView.swift; sourceTree = "<group>"; };
5CEBD7452A5C0A8F00665FE2 /* KeyboardPadding.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = KeyboardPadding.swift; sourceTree = "<group>"; };
5CEBD7472A5F115D00665FE2 /* SetDeliveryReceiptsView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SetDeliveryReceiptsView.swift; sourceTree = "<group>"; };
5CF9371D2B23429500E1D781 /* ConcurrentQueue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConcurrentQueue.swift; sourceTree = "<group>"; };
5CF9371F2B24DE8C00E1D781 /* SharedFileSubscriber.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SharedFileSubscriber.swift; sourceTree = "<group>"; };
5CF937212B25034A00E1D781 /* NSESubscriber.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NSESubscriber.swift; sourceTree = "<group>"; };
5CFA59C32860BC6200863A68 /* MigrateToAppGroupView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MigrateToAppGroupView.swift; sourceTree = "<group>"; };
@@ -800,7 +798,6 @@
isa = PBXGroup;
children = (
5CDCAD5128186DE400503DA2 /* SimpleX NSE.entitlements */,
5CF9371D2B23429500E1D781 /* ConcurrentQueue.swift */,
5CDCAD472818589900503DA2 /* NotificationService.swift */,
5CDCAD492818589900503DA2 /* Info.plist */,
5CB0BA862826CB3A00B3292C /* InfoPlist.strings */,
@@ -1285,7 +1282,6 @@
files = (
5CDCAD482818589900503DA2 /* NotificationService.swift in Sources */,
5CFE0922282EEAF60002594B /* ZoomableScrollView.swift in Sources */,
5CF9371E2B23429500E1D781 /* ConcurrentQueue.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};

View File

@@ -2120,7 +2120,7 @@ public enum ConnectionEntity: Decodable {
public var id: String? {
switch self {
case let .rcvDirectMsgConnection(contact):
return contact?.id ?? nil
return contact?.id
case let .rcvGroupMsgConnection(_, groupMember):
return groupMember.id
case let .userContactConnection(userContact):