From 26979ff6b59a157e8d6432909d05c9cbf208b015 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 9 Jul 2024 08:36:03 +0100 Subject: [PATCH] smp server: simplify client subscriptions (#1223) --- src/Simplex/Messaging/Server.hs | 81 ++++++++++++------------- src/Simplex/Messaging/Server/Env/STM.hs | 7 ++- 2 files changed, 43 insertions(+), 45 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 567ff13b6..014bcd366 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -514,11 +514,11 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT sameClientId :: Client -> Client -> Bool sameClientId Client {clientId} Client {clientId = cId'} = clientId == cId' -cancelSub :: TVar Sub -> IO () -cancelSub sub = - readTVarIO sub >>= \case - Sub {subThread = SubThread t} -> liftIO $ deRefWeak t >>= mapM_ killThread - _ -> return () +cancelSub :: Sub -> IO () +cancelSub s = + readTVarIO (subThread s) >>= \case + SubThread t -> liftIO $ deRefWeak t >>= mapM_ killThread + _ -> pure () receive :: Transport c => THandleSMP c 'TServer -> Client -> M () receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do @@ -901,23 +901,23 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi Nothing -> do atomically $ modifyTVar' (qSub stats) (+ 1) newSub >>= deliver - Just sub -> - readTVarIO sub >>= \case - Sub {subThread = ProhibitSub} -> do + Just s@Sub {subThread} -> + readTVarIO subThread >>= \case + ProhibitSub -> do -- cannot use SUB in the same connection where GET was used atomically $ modifyTVar' (qSubProhibited stats) (+ 1) pure (corrId, rId, ERR $ CMD PROHIBITED) - s -> do + _ -> do atomically $ modifyTVar' (qSubDuplicate stats) (+ 1) - atomically (tryTakeTMVar $ delivered s) >> deliver sub + atomically (tryTakeTMVar $ delivered s) >> deliver s where - newSub :: M (TVar Sub) + newSub :: M Sub newSub = time "SUB newSub" . atomically $ do writeTQueue subscribedQ (rId, clnt) - sub <- newTVar =<< newSubscription NoSub + sub <- newSubscription NoSub TM.insert rId sub subscriptions pure sub - deliver :: TVar Sub -> M (Transmission BrokerMsg) + deliver :: Sub -> M (Transmission BrokerMsg) deliver sub = do q <- getStoreMsgQueue "SUB" rId msg_ <- atomically $ tryPeekMsg q @@ -928,9 +928,9 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> atomically newSub >>= getMessage_ - Just sub -> - readTVarIO sub >>= \case - s@Sub {subThread = ProhibitSub} -> + Just s@Sub {subThread} -> + readTVarIO subThread >>= \case + ProhibitSub -> atomically (tryTakeTMVar $ delivered s) >> getMessage_ s -- cannot use GET in the same connection where there is an active subscription @@ -939,8 +939,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi newSub :: STM Sub newSub = do s <- newSubscription ProhibitSub - sub <- newTVar s - TM.insert queueId sub subscriptions + TM.insert queueId s subscriptions pure s getMessage_ :: Sub -> M (Transmission BrokerMsg) getMessage_ s = do @@ -968,10 +967,10 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi Nothing -> pure $ err NO_MSG Just sub -> atomically (getDelivered sub) >>= \case - Just s -> do + Just st -> do q <- getStoreMsgQueue "ACK" queueId - case s of - Sub {subThread = ProhibitSub} -> do + case st of + ProhibitSub -> do deletedMsg_ <- atomically $ tryDelMsg q msgId mapM_ updateStats deletedMsg_ pure ok @@ -981,12 +980,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi deliverMessage "ACK" qr queueId sub msg_ _ -> pure $ err NO_MSG where - getDelivered :: TVar Sub -> STM (Maybe Sub) - getDelivered sub = do - s@Sub {delivered} <- readTVar sub + getDelivered :: Sub -> STM (Maybe SubscriptionThread) + getDelivered Sub {delivered, subThread} = do tryTakeTMVar delivered $>>= \msgId' -> if msgId == msgId' || B.null msgId - then pure $ Just s + then Just <$> readTVar subThread else putTMVar delivered msgId' $> Nothing updateStats :: Message -> M () updateStats = \case @@ -1074,37 +1072,36 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi deliverToSub = TM.lookup rId subscribers $>>= \rc@Client {subscriptions = subs, sndQ = q} -> TM.lookup rId subs - $>>= \sub -> readTVar sub >>= \case - s@Sub {subThread = NoSub, delivered} -> + $>>= \s@Sub {subThread, delivered} -> readTVar subThread >>= \case + NoSub -> tryTakeTMVar delivered >>= \case Just _ -> pure Nothing -- if a message was already delivered, should not deliver more Nothing -> ifM (isFullTBQueue q) - (modifyTVar' sub (\s' -> s' {subThread = SubPending}) $> Just (rc, sub)) + (writeTVar subThread SubPending $> Just (rc, s)) (deliver q s $> Nothing) _ -> pure Nothing deliver q s = do let encMsg = encryptMsg qr msg writeTBQueue q [(CorrId "", rId, MSG encMsg)] void $ setDelivered s msg - forkDeliver (rc@Client {sndQ = q}, sub) = do + forkDeliver (rc@Client {sndQ = q}, s@Sub {subThread, delivered}) = do t <- mkWeakThreadId =<< forkIO deliverThread - atomically . modifyTVar' sub $ \case + atomically . modifyTVar' subThread $ \case -- this case is needed because deliverThread can exit before it - s@Sub {subThread = SubPending} -> s {subThread = SubThread t} - s -> s + SubPending -> SubThread t + st -> st where deliverThread = do labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND" time "deliver" . atomically $ whenM (maybe False (sameClientId rc) <$> TM.lookup rId subscribers) $ do - s@Sub {delivered} <- readTVar sub tryTakeTMVar delivered >>= \case Just _ -> pure () -- if a message was already delivered, should not deliver more Nothing -> do deliver q s - writeTVar sub $! s {subThread = NoSub} + writeTVar subThread NoSub trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool) trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg = @@ -1180,11 +1177,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi verified = \case VRVerified qr -> Right (qr, (corrId', entId', cmd')) VRFailed -> Left (corrId', entId', ERR AUTH) - deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> Maybe Message -> M (Transmission BrokerMsg) - deliverMessage name qr rId sub msg_ = time (name <> " deliver") . atomically $ - readTVar sub >>= \case - Sub {subThread = ProhibitSub} -> pure resp - s -> case msg_ of + deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> M (Transmission BrokerMsg) + deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $ + readTVar subThread >>= \case + ProhibitSub -> pure resp + _ -> case msg_ of Just msg -> let encMsg = encryptMsg qr msg in setDelivered s msg $> (corrId, rId, MSG encMsg) @@ -1232,9 +1229,9 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi pure QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} pure (corrId, queueId, INFO info) where - mkQSub sub = do - Sub {subThread, delivered} <- readTVar sub - let qSubThread = case subThread of + mkQSub Sub {subThread, delivered} = do + st <- readTVar subThread + let qSubThread = case st of NoSub -> QNoSub SubPending -> QSubPending SubThread _ -> QSubThread diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 559aea280..b40e9fc16 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -144,7 +144,7 @@ type ClientId = Int data Client = Client { clientId :: ClientId, - subscriptions :: TMap RecipientId (TVar Sub), + subscriptions :: TMap RecipientId Sub, ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), @@ -163,7 +163,7 @@ data Client = Client data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) | ProhibitSub data Sub = Sub - { subThread :: SubscriptionThread, + { subThread :: TVar SubscriptionThread, delivered :: TMVar MsgId } @@ -193,8 +193,9 @@ newClient nextClientId qSize thVersion sessionId createdAt = do return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} newSubscription :: SubscriptionThread -> STM Sub -newSubscription subThread = do +newSubscription st = do delivered <- newEmptyTMVar + subThread <- newTVar st return Sub {subThread, delivered} newEnv :: ServerConfig -> IO Env