From ba3c75e58c8a44e710e3d6b1d9681ebe628e47d7 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 7 Jul 2025 18:49:47 +0100 Subject: [PATCH] smp server: correctly track if ntf service is subscribed and total subscribed queues count (fixes race condition between NSUB and NSUBS from notification server) (#1583) --- src/Simplex/Messaging/Server.hs | 19 ++++++++++--------- src/Simplex/Messaging/Server/Env/STM.hs | 6 ++++++ 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 03a672408..7c0237e18 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1321,7 +1321,7 @@ client -- TODO [certs rcv] rcv subscriptions Server {subscribers, ntfSubscribers} ms - clnt@Client {clientId, subscriptions, ntfSubscriptions, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do + clnt@Client {clientId, subscriptions, ntfSubscriptions, ntfServiceSubscribed, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" let THandleParams {thVersion} = thParams' service = peerClientService =<< thAuth thParams' @@ -1721,18 +1721,19 @@ client subscribeServiceNotifications :: THPeerClientService -> M s BrokerMsg subscribeServiceNotifications THClientService {serviceId} = do - srvSubs <- readTVarIO ntfServiceSubsCount - if srvSubs == 0 - then + subscribed <- readTVarIO ntfServiceSubscribed + if subscribed + then SOKS <$> readTVarIO ntfServiceSubsCount + else liftIO (getNtfServiceQueueCount @(StoreQueue s) (queueStore ms) serviceId) >>= \case Left e -> pure $ ERR e - Right count -> do + Right !count' -> do atomically $ do - modifyTVar' ntfServiceSubsCount (+ count) -- service count - modifyTVar' (totalServiceSubs ntfSubscribers) (+ count) -- server count for all services + writeTVar ntfServiceSubscribed True + count <- swapTVar ntfServiceSubsCount count' + modifyTVar' (totalServiceSubs ntfSubscribers) (+ (count' - count)) -- server count for all services atomically $ writeTQueue (subQ ntfSubscribers) (CSService serviceId, clientId) - pure $ SOKS count - else pure $ SOKS srvSubs + pure $ SOKS count' acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg) acknowledgeMsg msgId q qr = time "ACK" $ do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index a04a8acd8..8039a252e 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -389,6 +389,8 @@ data Client s = Client { clientId :: ClientId, subscriptions :: TMap RecipientId Sub, ntfSubscriptions :: TMap NotifierId (), + serviceSubscribed :: TVar Bool, -- set independently of serviceSubsCount, to track whether service subscription command was received + ntfServiceSubscribed :: TVar Bool, serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)), @@ -458,6 +460,8 @@ newClient :: ClientId -> Natural -> THandleParams SMPVersion 'TServer -> SystemT newClient clientId qSize clientTHParams createdAt = do subscriptions <- TM.emptyIO ntfSubscriptions <- TM.emptyIO + serviceSubscribed <- newTVarIO False + ntfServiceSubscribed <- newTVarIO False serviceSubsCount <- newTVarIO 0 ntfServiceSubsCount <- newTVarIO 0 rcvQ <- newTBQueueIO qSize @@ -474,6 +478,8 @@ newClient clientId qSize clientTHParams createdAt = do { clientId, subscriptions, ntfSubscriptions, + serviceSubscribed, + ntfServiceSubscribed, serviceSubsCount, ntfServiceSubsCount, rcvQ,