use multiple queues and workers, remove semaphores and threads per notification

This commit is contained in:
Evgeny @ SimpleX Chat
2026-05-17 09:46:20 +00:00
parent ab6fcc7aab
commit 992950fde6
3 changed files with 80 additions and 50 deletions
+63 -35
View File
@@ -89,7 +89,6 @@ import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, race_, unliftIO, withFile)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId)
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.MVar (withMVar)
import UnliftIO.STM
#if MIN_VERSION_base(4,18,0)
import GHC.Conc (listThreads)
@@ -117,7 +116,6 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
void $ forkIO $ resubscribe s
raceAny_
( ntfSubscriber s
: ntfPush ps
: periodicNtfsThread ps
: map runServer transports
<> serverStatsThread_ cfg
@@ -148,12 +146,17 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
saveServer
NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber
liftIO $ readTVarIO smpSubscribers >>= mapM_ stopSubscriber
NtfPushServer {pushWorkers} <- asks pushServer
liftIO $ readTVarIO pushWorkers >>= mapM_ stopPushWorker
liftIO $ closeSMPClientAgent smpAgent
logNote "Server stopped"
where
stopSubscriber v =
atomically (tryReadTMVar $ sessionVar v)
>>= mapM (deRefWeak . subThreadId >=> mapM_ killThread)
stopPushWorker v =
atomically (tryReadTMVar $ sessionVar v)
>>= mapM (deRefWeak . workerThreadId >=> mapM_ killThread)
saveServer :: M ()
saveServer = asks store >>= liftIO . closeNtfDbStore >> saveServerStats
@@ -258,7 +261,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
let threadsCount = 0
#endif
let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber
NtfPushServer {pushQ} = pushServer
NtfPushServer {pushWorkers} = pushServer
SMPClientAgent {smpClients, smpSessions, smpSubWorkers} = a
srvSubscribers <- getSMPWorkerMetrics a smpSubscribers
srvClients <- getSMPWorkerMetrics a smpClients
@@ -268,7 +271,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
ntfPendingServiceSubs <- getSMPServiceSubMetrics a pendingServiceSubs snd
ntfPendingQueueSubs <- getSMPSubMetrics a pendingQueueSubs
smpSessionCount <- M.size <$> readTVarIO smpSessions
apnsPushQLength <- atomically $ lengthTBQueue pushQ
apnsPushQLength <- pushWorkersQLength pushWorkers
pure
NtfRealTimeMetrics
{ threadsCount,
@@ -527,9 +530,9 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
where
receiveSMP = do
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
ps <- asks pushServer
stats <- asks serverStats
liftIO $ forever $ do
forever $ do
((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
forM ts $ \(ntfId, t) -> case t of
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
@@ -538,24 +541,25 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- getSystemTime
updatePeriodStats (activeSubs stats) ntfId
ntfTs <- liftIO getSystemTime
liftIO $ updatePeriodStats (activeSubs stats) ntfId
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing
addTokenLastNtf st newNtf >>= \case
srvHost = safeDecodeUtf8 $ strEncode h
isOwn = isOwnServer ca srv
liftIO (addTokenLastNtf st newNtf) >>= \case
Right (tkn, lastNtfs) -> do
atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs)
incNtfStat_ stats ntfReceived
mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_
Left AUTH -> do
pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs
liftIO $ incNtfStat_ stats ntfReceived
when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats)
Left AUTH -> liftIO $ do
incNtfStat_ stats ntfReceivedAuth
mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_
when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats)
Left _ -> pure ()
Right SMP.END ->
whenM (atomically $ activeClientSession' ca sessionId srv) $
void $ updateSrvSubStatus st smpQueue NSEnd
void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd
Right SMP.DELD ->
void $ updateSrvSubStatus st smpQueue NSDeleted
void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e
@@ -633,13 +637,28 @@ logSubStatus srv event n updated =
showServer' :: SMPServer -> Text
showServer' = decodeLatin1 . strEncode . host
ntfPush :: NtfPushServer -> M ()
ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
(srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
pushNotification :: NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> M ()
pushNotification s srvHost_ isOwn tkn ntf = do
q <- getOrCreatePushWorker s srvHost_ isOwn
atomically $ writeTBQueue q (tkn, ntf)
getOrCreatePushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> M (TBQueue (NtfTknRec, PushNotification))
getOrCreatePushWorker s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} srvHost_ isOwn = do
ts <- liftIO getCurrentTime
atomically (getSessVar pushWorkerSeq srvHost_ pushWorkers ts) >>= \case
Left v -> do
q <- liftIO $ newTBQueueIO pushQSize
tId <- mkWeakThreadId =<< forkIO (runPushWorker s srvHost_ isOwn q)
atomically $ putTMVar (sessionVar v) PushWorker {workerQ = q, workerThreadId = tId}
pure q
Right v -> workerQ <$> atomically (readTMVar $ sessionVar v)
runPushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> M ()
runPushWorker s srvHost_ isOwn q = forever $ do
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue q)
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
lock <- liftIO $ getDeliveryLock srvDeliveryLocks srvHost_
st <- asks store
void $ forkIO $ withMVar lock $ \_ -> case ntf of
case ntf of
PNVerification _ ->
liftIO (deliverNotification st pp tkn ntf) >>= \case
Right _ -> do
@@ -658,19 +677,19 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
liftIO (deliverNotification st pp tkn ntf) >>= \case
Left _ -> do
incNtfStatT t ntfFailed
liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_
when isOwn $ liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_
Right () -> do
incNtfStatT t ntfDelivered
liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_
when isOwn $ liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_
where
checkActiveTkn :: NtfTknStatus -> M () -> M ()
checkActiveTkn status action
| status == NTActive = action
| otherwise = liftIO $ logError "bad notification token status"
deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ())
deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do
deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf' = do
(deliver, clientVar) <- getPushClient s pp
runExceptT (deliver tkn ntf) >>= \case
runExceptT (deliver tkn ntf') >>= \case
Right _ -> pure $ Right ()
Left e -> case e of
PPConnection ce -> retryDeliver clientVar $ "connection " <> tshow ce
@@ -682,14 +701,12 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
err e
PPPermanentError -> err e
where
-- removeSessVar checks identity, so concurrent retries collapse: the first
-- one removes the failing client, subsequent ones observe a fresh replacement
retryDeliver :: PushClientVar -> Text -> IO (Either PushProviderError ())
retryDeliver oldVar reason = do
logWarn $ "retrying push (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> reason
atomically $ removeSessVar oldVar pp (pushClients s)
(deliver, _) <- getPushClient s pp
runExceptT (deliver tkn ntf) >>= \case
runExceptT (deliver tkn ntf') >>= \case
Right _ -> pure $ Right ()
Left e -> case e of
PPTokenInvalid r -> do
@@ -698,15 +715,26 @@ ntfPush s@NtfPushServer {pushQ, srvDeliveryLocks} = forever $ do
_ -> err e
err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e
pushWorkersQLength :: TMap (Maybe T.Text) PushWorkerVar -> IO Natural
pushWorkersQLength workers = do
ws <- readTVarIO workers
foldM addQLength 0 ws
where
addQLength acc v =
atomically (tryReadTMVar $ sessionVar v) >>= \case
Just PushWorker {workerQ} -> (acc +) <$> atomically (lengthTBQueue workerQ)
Nothing -> pure acc
periodicNtfsThread :: NtfPushServer -> M ()
periodicNtfsThread NtfPushServer {pushQ} = do
periodicNtfsThread s = do
st <- asks store
ntfsInterval <- asks $ periodicNtfsInterval . config
let interval = 1000000 * ntfsInterval
q <- getOrCreatePushWorker s Nothing False
liftIO $ forever $ do
threadDelay interval
now <- systemSeconds <$> getSystemTime
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages)
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue q (tkn, PNCheckMessages)
logNote $ "Scheduled periodic notifications: " <> tshow cnt
runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M ()
@@ -796,7 +824,7 @@ verifyNtfTransmission st thAuth (tAuth, authorized, (corrId, entId, cmd)) = case
e -> VRFailed e
client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M ()
client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServer {pushQ} =
client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps =
forever $
atomically (readTBQueue rcvQ)
>>= mapM processCommand
@@ -813,7 +841,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
ts <- liftIO $ getSystemDate
let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts
withNtfStore (`addNtfToken` tkn) $ \_ -> do
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode)
pushNotification ps Nothing False tkn $ PNVerification regCode
incNtfStatT token ntfVrfQueued
incNtfStatT token tknCreated
pure $ NRTknId tknId srvDhPubKey
@@ -829,7 +857,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
| otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification
where
sendVerification = do
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode)
pushNotification ps Nothing False tkn $ PNVerification tknRegCode
incNtfStatT token ntfVrfQueued
pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey
TVFY code -- this allows repeated verification for cases when client connection dropped before server response
@@ -847,7 +875,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
regCode <- getRegCode
let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode}
withNtfStore (`replaceNtfToken` tkn') $ \_ -> do
atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode)
pushNotification ps Nothing False tkn' $ PNVerification regCode
incNtfStatT token ntfVrfQueued
incNtfStatT token tknReplaced
pure NROk
@@ -14,6 +14,8 @@ module Simplex.Messaging.Notifications.Server.Env
SMPSubscriber (..),
NtfPushServer (..),
PushClientVar,
PushWorker (..),
PushWorkerVar,
NtfRequest (..),
NtfServerClient (..),
defaultInactiveClientExpiration,
@@ -21,7 +23,6 @@ module Simplex.Messaging.Notifications.Server.Env
newNtfSubscriber,
newNtfPushServer,
getPushClient,
getDeliveryLock,
newNtfServerClient,
) where
@@ -63,7 +64,6 @@ import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), ServiceC
import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential)
import System.Exit (exitFailure)
import System.Mem.Weak (Weak)
import UnliftIO.MVar
import UnliftIO.STM
data NtfServerConfig = NtfServerConfig
@@ -167,32 +167,34 @@ data SMPSubscriber = SMPSubscriber
}
data NtfPushServer = NtfPushServer
{ pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server
{ pushWorkers :: TMap (Maybe T.Text) PushWorkerVar, -- keyed by SMP server hostname, Nothing for non-server notifications
pushWorkerSeq :: TVar Int,
pushQSize :: Natural,
pushClients :: TMap PushProvider PushClientVar,
pushClientSeq :: TVar Int,
-- one lock per srvHost_ serializes per-server delivery while different servers proceed in parallel
srvDeliveryLocks :: TMap (Maybe T.Text) (MVar ()),
apnsConfig :: APNSPushClientConfig
}
data PushWorker = PushWorker
{ workerQ :: TBQueue (NtfTknRec, PushNotification),
workerThreadId :: Weak ThreadId
}
type PushWorkerVar = SessionVar PushWorker
-- The Either communicates client-creation failure from the winner to the waiters.
type PushClientVar = SessionVar (Either E.SomeException PushProviderClient)
newNtfPushServer :: Natural -> APNSPushClientConfig -> IO NtfPushServer
newNtfPushServer qSize apnsConfig = do
pushQ <- newTBQueueIO qSize
newNtfPushServer pushQSize apnsConfig = do
pushWorkers <- TM.emptyIO
pushWorkerSeq <- newTVarIO 0
pushClients <- TM.emptyIO
pushClientSeq <- newTVarIO 0
srvDeliveryLocks <- TM.emptyIO
pure NtfPushServer {pushQ, pushClients, pushClientSeq, srvDeliveryLocks, apnsConfig}
pure NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize, pushClients, pushClientSeq, apnsConfig}
getDeliveryLock :: TMap (Maybe T.Text) (MVar ()) -> Maybe T.Text -> IO (MVar ())
getDeliveryLock locks k = do
newLock <- newMVar ()
atomically $ TM.lookup k locks >>= maybe (TM.insert k newLock locks $> newLock) pure
-- | Single-flight access to the per-provider push client.
-- take (getSessVar) → create (newPushClient) or wait (waitForPushClient).
-- The returned PushClientVar is the handle retryDeliver passes to removeSessVar to evict
-- this specific instance before re-fetching.
getPushClient :: NtfPushServer -> PushProvider -> IO (PushProviderClient, PushClientVar)
@@ -343,7 +343,7 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token
nonce <- atomically $ C.randomCbNonce nonceDrg
apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn
req <- liftIO $ apnsRequest c tknStr apnsNtf
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequestDirect http2 req Nothing
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing
let status = H.responseStatus response
reason' = maybe "" reason $ J.decodeStrict' bodyHead
if status == Just N.ok200