use multiple queues and workers, remove semaphores and threads per notification

This commit is contained in:
Evgeny @ SimpleX Chat
2026-05-17 09:46:20 +00:00
parent ab6fcc7aab
commit c1c9796d3b
3 changed files with 85 additions and 46 deletions
+56 -31
View File
@@ -89,7 +89,6 @@ import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, race_, unliftIO, withFile)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId)
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.MVar (withMVar)
import UnliftIO.STM
#if MIN_VERSION_base(4,18,0)
import GHC.Conc (listThreads)
@@ -117,7 +116,6 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
void $ forkIO $ resubscribe s
raceAny_
( ntfSubscriber s
: ntfPush ps
: periodicNtfsThread ps
: map runServer transports
<> serverStatsThread_ cfg
@@ -148,12 +146,17 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
saveServer
NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber
liftIO $ readTVarIO smpSubscribers >>= mapM_ stopSubscriber
NtfPushServer {pushWorkers} <- asks pushServer
liftIO $ readTVarIO pushWorkers >>= mapM_ stopPushWorker
liftIO $ closeSMPClientAgent smpAgent
logNote "Server stopped"
where
stopSubscriber v =
atomically (tryReadTMVar $ sessionVar v)
>>= mapM (deRefWeak . subThreadId >=> mapM_ killThread)
stopPushWorker v =
atomically (tryReadTMVar $ sessionVar v)
>>= mapM (deRefWeak . workerThreadId >=> mapM_ killThread)
saveServer :: M ()
saveServer = asks store >>= liftIO . closeNtfDbStore >> saveServerStats
@@ -258,7 +261,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
let threadsCount = 0
#endif
let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber
NtfPushServer {pushQ} = pushServer
NtfPushServer {pushWorkers} = pushServer
SMPClientAgent {smpClients, smpSessions, smpSubWorkers} = a
srvSubscribers <- getSMPWorkerMetrics a smpSubscribers
srvClients <- getSMPWorkerMetrics a smpClients
@@ -268,7 +271,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
ntfPendingServiceSubs <- getSMPServiceSubMetrics a pendingServiceSubs snd
ntfPendingQueueSubs <- getSMPSubMetrics a pendingQueueSubs
smpSessionCount <- M.size <$> readTVarIO smpSessions
apnsPushQLength <- atomically $ lengthTBQueue pushQ
apnsPushQLength <- pushWorkersQLength pushWorkers
pure
NtfRealTimeMetrics
{ threadsCount,
@@ -527,9 +530,9 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
where
receiveSMP = do
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
ps <- asks pushServer
stats <- asks serverStats
liftIO $ forever $ do
forever $ do
((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
forM ts $ \(ntfId, t) -> case t of
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
@@ -538,24 +541,24 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- getSystemTime
updatePeriodStats (activeSubs stats) ntfId
ntfTs <- liftIO getSystemTime
liftIO $ updatePeriodStats (activeSubs stats) ntfId
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing
addTokenLastNtf st newNtf >>= \case
liftIO (addTokenLastNtf st newNtf) >>= \case
Right (tkn, lastNtfs) -> do
atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs)
incNtfStat_ stats ntfReceived
mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_
Left AUTH -> do
pushNotification ps srvHost_ tkn $ PNMessage lastNtfs
liftIO $ incNtfStat_ stats ntfReceived
liftIO $ mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_
Left AUTH -> liftIO $ do
incNtfStat_ stats ntfReceivedAuth
mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_
Left _ -> pure ()
Right SMP.END ->
whenM (atomically $ activeClientSession' ca sessionId srv) $
void $ updateSrvSubStatus st smpQueue NSEnd
void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd
Right SMP.DELD ->
void $ updateSrvSubStatus st smpQueue NSDeleted
void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e
@@ -633,13 +636,26 @@ logSubStatus srv event n updated =
showServer' :: SMPServer -> Text
showServer' = decodeLatin1 . strEncode . host
ntfPush :: NtfPushServer -> M ()
ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
(srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
pushNotification :: NtfPushServer -> Maybe T.Text -> NtfTknRec -> PushNotification -> M ()
pushNotification s srvHost_ tkn ntf = do
q <- pushWorkerQueue s srvHost_
atomically $ writeTBQueue q (tkn, ntf)
pushWorkerQueue :: NtfPushServer -> Maybe T.Text -> M (TBQueue (NtfTknRec, PushNotification))
pushWorkerQueue s srvHost_ = do
(q, v) <- liftIO $ getOrCreatePushWorker s srvHost_
workerExists <- liftIO . atomically $ not <$> isEmptyTMVar (sessionVar v)
unless workerExists $ do
tId <- mkWeakThreadId =<< forkIO (runPushWorker s srvHost_ q)
liftIO . atomically $ putTMVar (sessionVar v) PushWorker {workerQ = q, workerThreadId = tId}
pure q
runPushWorker :: NtfPushServer -> Maybe T.Text -> TBQueue (NtfTknRec, PushNotification) -> M ()
runPushWorker s srvHost_ q = forever $ do
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue q)
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
lock <- liftIO $ getDeliveryLock srvDeliveryLocks srvHost_
st <- asks store
void $ forkIO $ withMVar lock $ \_ -> case ntf of
case ntf of
PNVerification _ ->
liftIO (deliverNotification st pp tkn ntf) >>= \case
Right _ -> do
@@ -668,9 +684,9 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
| status == NTActive = action
| otherwise = liftIO $ logError "bad notification token status"
deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ())
deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do
deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf' = do
(deliver, clientVar) <- getPushClient s pp
runExceptT (deliver tkn ntf) >>= \case
runExceptT (deliver tkn ntf') >>= \case
Right _ -> pure $ Right ()
Left e -> case e of
PPConnection ce -> retryDeliver clientVar $ "connection " <> tshow ce
@@ -682,14 +698,12 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
err e
PPPermanentError -> err e
where
-- removeSessVar checks identity, so concurrent retries collapse: the first
-- one removes the failing client, subsequent ones observe a fresh replacement
retryDeliver :: PushClientVar -> Text -> IO (Either PushProviderError ())
retryDeliver oldVar reason = do
logWarn $ "retrying push (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> reason
atomically $ removeSessVar oldVar pp (pushClients s)
(deliver, _) <- getPushClient s pp
runExceptT (deliver tkn ntf) >>= \case
runExceptT (deliver tkn ntf') >>= \case
Right _ -> pure $ Right ()
Left e -> case e of
PPTokenInvalid r -> do
@@ -698,15 +712,26 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
_ -> err e
err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e
pushWorkersQLength :: TMap (Maybe T.Text) PushWorkerVar -> IO Natural
pushWorkersQLength workers = do
ws <- readTVarIO workers
foldM addQLength 0 ws
where
addQLength acc v =
atomically (tryReadTMVar $ sessionVar v) >>= \case
Just PushWorker {workerQ} -> (acc +) <$> atomically (lengthTBQueue workerQ)
Nothing -> pure acc
periodicNtfsThread :: NtfPushServer -> M ()
periodicNtfsThread NtfPushServer {pushQ} = do
periodicNtfsThread s = do
st <- asks store
ntfsInterval <- asks $ periodicNtfsInterval . config
let interval = 1000000 * ntfsInterval
q <- pushWorkerQueue s Nothing
liftIO $ forever $ do
threadDelay interval
now <- systemSeconds <$> getSystemTime
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages)
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue q (tkn, PNCheckMessages)
logNote $ "Scheduled periodic notifications: " <> tshow cnt
runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M ()
@@ -796,7 +821,7 @@ verifyNtfTransmission st thAuth (tAuth, authorized, (corrId, entId, cmd)) = case
e -> VRFailed e
client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M ()
client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServer {pushQ} =
client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps =
forever $
atomically (readTBQueue rcvQ)
>>= mapM processCommand
@@ -813,7 +838,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
ts <- liftIO $ getSystemDate
let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts
withNtfStore (`addNtfToken` tkn) $ \_ -> do
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode)
pushNotification ps Nothing tkn (PNVerification regCode)
incNtfStatT token ntfVrfQueued
incNtfStatT token tknCreated
pure $ NRTknId tknId srvDhPubKey
@@ -829,7 +854,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
| otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification
where
sendVerification = do
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode)
pushNotification ps Nothing tkn (PNVerification tknRegCode)
incNtfStatT token ntfVrfQueued
pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey
TVFY code -- this allows repeated verification for cases when client connection dropped before server response
@@ -847,7 +872,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
regCode <- getRegCode
let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode}
withNtfStore (`replaceNtfToken` tkn') $ \_ -> do
atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode)
pushNotification ps Nothing tkn' (PNVerification regCode)
incNtfStatT token ntfVrfQueued
incNtfStatT token tknReplaced
pure NROk
@@ -14,6 +14,8 @@ module Simplex.Messaging.Notifications.Server.Env
SMPSubscriber (..),
NtfPushServer (..),
PushClientVar,
PushWorker (..),
PushWorkerVar,
NtfRequest (..),
NtfServerClient (..),
defaultInactiveClientExpiration,
@@ -21,7 +23,7 @@ module Simplex.Messaging.Notifications.Server.Env
newNtfSubscriber,
newNtfPushServer,
getPushClient,
getDeliveryLock,
getOrCreatePushWorker,
newNtfServerClient,
) where
@@ -63,7 +65,6 @@ import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), ServiceC
import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential)
import System.Exit (exitFailure)
import System.Mem.Weak (Weak)
import UnliftIO.MVar
import UnliftIO.STM
data NtfServerConfig = NtfServerConfig
@@ -167,32 +168,45 @@ data SMPSubscriber = SMPSubscriber
}
data NtfPushServer = NtfPushServer
{ pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server
{ pushWorkers :: TMap (Maybe T.Text) PushWorkerVar,
pushWorkerSeq :: TVar Int,
pushQSize :: Natural,
pushClients :: TMap PushProvider PushClientVar,
pushClientSeq :: TVar Int,
-- one lock per srvHost_ serializes per-server delivery while different servers proceed in parallel
srvDeliveryLocks :: TMap (Maybe T.Text) (MVar ()),
apnsConfig :: APNSPushClientConfig
}
data PushWorker = PushWorker
{ workerQ :: TBQueue (NtfTknRec, PushNotification),
workerThreadId :: Weak ThreadId
}
type PushWorkerVar = SessionVar PushWorker
-- The Either communicates client-creation failure from the winner to the waiters.
type PushClientVar = SessionVar (Either E.SomeException PushProviderClient)
newNtfPushServer :: Natural -> APNSPushClientConfig -> IO NtfPushServer
newNtfPushServer qSize apnsConfig = do
pushQ <- newTBQueueIO qSize
newNtfPushServer pushQSize apnsConfig = do
pushWorkers <- TM.emptyIO
pushWorkerSeq <- newTVarIO 0
pushClients <- TM.emptyIO
pushClientSeq <- newTVarIO 0
srvDeliveryLocks <- TM.emptyIO
pure NtfPushServer {pushQ, pushClients, pushClientSeq, srvDeliveryLocks, apnsConfig}
pure NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize, pushClients, pushClientSeq, apnsConfig}
getDeliveryLock :: TMap (Maybe T.Text) (MVar ()) -> Maybe T.Text -> IO (MVar ())
getDeliveryLock locks k = do
newLock <- newMVar ()
atomically $ TM.lookup k locks >>= maybe (TM.insert k newLock locks $> newLock) pure
-- | Get or create the per-srvHost_ push worker, returning its queue.
getOrCreatePushWorker :: NtfPushServer -> Maybe T.Text -> IO (TBQueue (NtfTknRec, PushNotification), PushWorkerVar)
getOrCreatePushWorker NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} srvHost_ = do
ts <- getCurrentTime
atomically (getSessVar pushWorkerSeq srvHost_ pushWorkers ts) >>= \case
Left v -> do
q <- newTBQueueIO pushQSize
pure (q, v)
Right v -> do
PushWorker {workerQ} <- atomically $ readTMVar (sessionVar v)
pure (workerQ, v)
-- | Single-flight access to the per-provider push client.
-- take (getSessVar) → create (newPushClient) or wait (waitForPushClient).
-- The returned PushClientVar is the handle retryDeliver passes to removeSessVar to evict
-- this specific instance before re-fetching.
getPushClient :: NtfPushServer -> PushProvider -> IO (PushProviderClient, PushClientVar)
@@ -343,7 +343,7 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token
nonce <- atomically $ C.randomCbNonce nonceDrg
apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn
req <- liftIO $ apnsRequest c tknStr apnsNtf
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequestDirect http2 req Nothing
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing
let status = H.responseStatus response
reason' = maybe "" reason $ J.decodeStrict' bodyHead
if status == Just N.ok200