From 992950fde654c7ac6d2561ed58ebcb2ebf4de6cc Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Sun, 17 May 2026 09:46:20 +0000 Subject: [PATCH] use multiple queues and workers, remove semaphores and threads per notification --- src/Simplex/Messaging/Notifications/Server.hs | 98 ++++++++++++------- .../Messaging/Notifications/Server/Env.hs | 30 +++--- .../Notifications/Server/Push/APNS.hs | 2 +- 3 files changed, 80 insertions(+), 50 deletions(-) diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 5bd6e2064..f2ed2d6ea 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -89,7 +89,6 @@ import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, race_, unliftIO, withFile) import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception -import UnliftIO.MVar (withMVar) import UnliftIO.STM #if MIN_VERSION_base(4,18,0) import GHC.Conc (listThreads) @@ -117,7 +116,6 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} void $ forkIO $ resubscribe s raceAny_ ( ntfSubscriber s - : ntfPush ps : periodicNtfsThread ps : map runServer transports <> serverStatsThread_ cfg @@ -148,12 +146,17 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} saveServer NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber liftIO $ readTVarIO smpSubscribers >>= mapM_ stopSubscriber + NtfPushServer {pushWorkers} <- asks pushServer + liftIO $ readTVarIO pushWorkers >>= mapM_ stopPushWorker liftIO $ closeSMPClientAgent smpAgent logNote "Server stopped" where stopSubscriber v = atomically (tryReadTMVar $ sessionVar v) >>= mapM (deRefWeak . subThreadId >=> mapM_ killThread) + stopPushWorker v = + atomically (tryReadTMVar $ sessionVar v) + >>= mapM (deRefWeak . workerThreadId >=> mapM_ killThread) saveServer :: M () saveServer = asks store >>= liftIO . closeNtfDbStore >> saveServerStats @@ -258,7 +261,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} let threadsCount = 0 #endif let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber - NtfPushServer {pushQ} = pushServer + NtfPushServer {pushWorkers} = pushServer SMPClientAgent {smpClients, smpSessions, smpSubWorkers} = a srvSubscribers <- getSMPWorkerMetrics a smpSubscribers srvClients <- getSMPWorkerMetrics a smpClients @@ -268,7 +271,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} ntfPendingServiceSubs <- getSMPServiceSubMetrics a pendingServiceSubs snd ntfPendingQueueSubs <- getSMPSubMetrics a pendingQueueSubs smpSessionCount <- M.size <$> readTVarIO smpSessions - apnsPushQLength <- atomically $ lengthTBQueue pushQ + apnsPushQLength <- pushWorkersQLength pushWorkers pure NtfRealTimeMetrics { threadsCount, @@ -527,9 +530,9 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = where receiveSMP = do st <- asks store - NtfPushServer {pushQ} <- asks pushServer + ps <- asks pushServer stats <- asks serverStats - liftIO $ forever $ do + forever $ do ((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ forM ts $ \(ntfId, t) -> case t of STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen @@ -538,24 +541,25 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = let smpQueue = SMPQueueNtf srv ntfId case msgOrErr of Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- getSystemTime - updatePeriodStats (activeSubs stats) ntfId + ntfTs <- liftIO getSystemTime + liftIO $ updatePeriodStats (activeSubs stats) ntfId let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} - srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing - addTokenLastNtf st newNtf >>= \case + srvHost = safeDecodeUtf8 $ strEncode h + isOwn = isOwnServer ca srv + liftIO (addTokenLastNtf st newNtf) >>= \case Right (tkn, lastNtfs) -> do - atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs) - incNtfStat_ stats ntfReceived - mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_ - Left AUTH -> do + pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs + liftIO $ incNtfStat_ stats ntfReceived + when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats) + Left AUTH -> liftIO $ do incNtfStat_ stats ntfReceivedAuth - mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_ + when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) Left _ -> pure () Right SMP.END -> whenM (atomically $ activeClientSession' ca sessionId srv) $ - void $ updateSrvSubStatus st smpQueue NSEnd + void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd Right SMP.DELD -> - void $ updateSrvSubStatus st smpQueue NSDeleted + void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e Right _ -> logError "SMP server unexpected response" Left e -> logError $ "SMP client error: " <> tshow e @@ -633,13 +637,28 @@ logSubStatus srv event n updated = showServer' :: SMPServer -> Text showServer' = decodeLatin1 . strEncode . host -ntfPush :: NtfPushServer -> M () -ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do - (srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ) +pushNotification :: NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> M () +pushNotification s srvHost_ isOwn tkn ntf = do + q <- getOrCreatePushWorker s srvHost_ isOwn + atomically $ writeTBQueue q (tkn, ntf) + +getOrCreatePushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> M (TBQueue (NtfTknRec, PushNotification)) +getOrCreatePushWorker s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} srvHost_ isOwn = do + ts <- liftIO getCurrentTime + atomically (getSessVar pushWorkerSeq srvHost_ pushWorkers ts) >>= \case + Left v -> do + q <- liftIO $ newTBQueueIO pushQSize + tId <- mkWeakThreadId =<< forkIO (runPushWorker s srvHost_ isOwn q) + atomically $ putTMVar (sessionVar v) PushWorker {workerQ = q, workerThreadId = tId} + pure q + Right v -> workerQ <$> atomically (readTMVar $ sessionVar v) + +runPushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> M () +runPushWorker s srvHost_ isOwn q = forever $ do + (tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue q) liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) - lock <- liftIO $ getDeliveryLock srvDeliveryLocks srvHost_ st <- asks store - void $ forkIO $ withMVar lock $ \_ -> case ntf of + case ntf of PNVerification _ -> liftIO (deliverNotification st pp tkn ntf) >>= \case Right _ -> do @@ -658,19 +677,19 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do liftIO (deliverNotification st pp tkn ntf) >>= \case Left _ -> do incNtfStatT t ntfFailed - liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ + when isOwn $ liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ Right () -> do incNtfStatT t ntfDelivered - liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ + when isOwn $ liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ where checkActiveTkn :: NtfTknStatus -> M () -> M () checkActiveTkn status action | status == NTActive = action | otherwise = liftIO $ logError "bad notification token status" deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ()) - deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do + deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf' = do (deliver, clientVar) <- getPushClient s pp - runExceptT (deliver tkn ntf) >>= \case + runExceptT (deliver tkn ntf') >>= \case Right _ -> pure $ Right () Left e -> case e of PPConnection ce -> retryDeliver clientVar $ "connection " <> tshow ce @@ -682,14 +701,12 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do err e PPPermanentError -> err e where - -- removeSessVar checks identity, so concurrent retries collapse: the first - -- one removes the failing client, subsequent ones observe a fresh replacement retryDeliver :: PushClientVar -> Text -> IO (Either PushProviderError ()) retryDeliver oldVar reason = do logWarn $ "retrying push (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> reason atomically $ removeSessVar oldVar pp (pushClients s) (deliver, _) <- getPushClient s pp - runExceptT (deliver tkn ntf) >>= \case + runExceptT (deliver tkn ntf') >>= \case Right _ -> pure $ Right () Left e -> case e of PPTokenInvalid r -> do @@ -698,15 +715,26 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do _ -> err e err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e +pushWorkersQLength :: TMap (Maybe T.Text) PushWorkerVar -> IO Natural +pushWorkersQLength workers = do + ws <- readTVarIO workers + foldM addQLength 0 ws + where + addQLength acc v = + atomically (tryReadTMVar $ sessionVar v) >>= \case + Just PushWorker {workerQ} -> (acc +) <$> atomically (lengthTBQueue workerQ) + Nothing -> pure acc + periodicNtfsThread :: NtfPushServer -> M () -periodicNtfsThread NtfPushServer {pushQ} = do +periodicNtfsThread s = do st <- asks store ntfsInterval <- asks $ periodicNtfsInterval . config let interval = 1000000 * ntfsInterval + q <- getOrCreatePushWorker s Nothing False liftIO $ forever $ do threadDelay interval now <- systemSeconds <$> getSystemTime - cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages) + cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue q (tkn, PNCheckMessages) logNote $ "Scheduled periodic notifications: " <> tshow cnt runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M () @@ -796,7 +824,7 @@ verifyNtfTransmission st thAuth (tAuth, authorized, (corrId, entId, cmd)) = case e -> VRFailed e client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M () -client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServer {pushQ} = +client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand @@ -813,7 +841,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ ts <- liftIO $ getSystemDate let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts withNtfStore (`addNtfToken` tkn) $ \_ -> do - atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode) + pushNotification ps Nothing False tkn $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknCreated pure $ NRTknId tknId srvDhPubKey @@ -829,7 +857,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ | otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification where sendVerification = do - atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode) + pushNotification ps Nothing False tkn $ PNVerification tknRegCode incNtfStatT token ntfVrfQueued pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey TVFY code -- this allows repeated verification for cases when client connection dropped before server response @@ -847,7 +875,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ regCode <- getRegCode let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode} withNtfStore (`replaceNtfToken` tkn') $ \_ -> do - atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode) + pushNotification ps Nothing False tkn' $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknReplaced pure NROk diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 626c36a41..2bf413346 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -14,6 +14,8 @@ module Simplex.Messaging.Notifications.Server.Env SMPSubscriber (..), NtfPushServer (..), PushClientVar, + PushWorker (..), + PushWorkerVar, NtfRequest (..), NtfServerClient (..), defaultInactiveClientExpiration, @@ -21,7 +23,6 @@ module Simplex.Messaging.Notifications.Server.Env newNtfSubscriber, newNtfPushServer, getPushClient, - getDeliveryLock, newNtfServerClient, ) where @@ -63,7 +64,6 @@ import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), ServiceC import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential) import System.Exit (exitFailure) import System.Mem.Weak (Weak) -import UnliftIO.MVar import UnliftIO.STM data NtfServerConfig = NtfServerConfig @@ -167,32 +167,34 @@ data SMPSubscriber = SMPSubscriber } data NtfPushServer = NtfPushServer - { pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server + { pushWorkers :: TMap (Maybe T.Text) PushWorkerVar, -- keyed by SMP server hostname, Nothing for non-server notifications + pushWorkerSeq :: TVar Int, + pushQSize :: Natural, pushClients :: TMap PushProvider PushClientVar, pushClientSeq :: TVar Int, - -- one lock per srvHost_ serializes per-server delivery while different servers proceed in parallel - srvDeliveryLocks :: TMap (Maybe T.Text) (MVar ()), apnsConfig :: APNSPushClientConfig } +data PushWorker = PushWorker + { workerQ :: TBQueue (NtfTknRec, PushNotification), + workerThreadId :: Weak ThreadId + } + +type PushWorkerVar = SessionVar PushWorker + -- The Either communicates client-creation failure from the winner to the waiters. type PushClientVar = SessionVar (Either E.SomeException PushProviderClient) newNtfPushServer :: Natural -> APNSPushClientConfig -> IO NtfPushServer -newNtfPushServer qSize apnsConfig = do - pushQ <- newTBQueueIO qSize +newNtfPushServer pushQSize apnsConfig = do + pushWorkers <- TM.emptyIO + pushWorkerSeq <- newTVarIO 0 pushClients <- TM.emptyIO pushClientSeq <- newTVarIO 0 - srvDeliveryLocks <- TM.emptyIO - pure NtfPushServer {pushQ, pushClients, pushClientSeq, srvDeliveryLocks, apnsConfig} + pure NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize, pushClients, pushClientSeq, apnsConfig} -getDeliveryLock :: TMap (Maybe T.Text) (MVar ()) -> Maybe T.Text -> IO (MVar ()) -getDeliveryLock locks k = do - newLock <- newMVar () - atomically $ TM.lookup k locks >>= maybe (TM.insert k newLock locks $> newLock) pure -- | Single-flight access to the per-provider push client. --- take (getSessVar) → create (newPushClient) or wait (waitForPushClient). -- The returned PushClientVar is the handle retryDeliver passes to removeSessVar to evict -- this specific instance before re-fetching. getPushClient :: NtfPushServer -> PushProvider -> IO (PushProviderClient, PushClientVar) diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 728183db1..e7e377f5c 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -343,7 +343,7 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token nonce <- atomically $ C.randomCbNonce nonceDrg apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf - HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequestDirect http2 req Nothing + HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing let status = H.responseStatus response reason' = maybe "" reason $ J.decodeStrict' bodyHead if status == Just N.ok200