From 0b45f7c00f8a55b6345cdc8a03f42da90fbb91fa Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 5 Oct 2025 22:03:07 +0100 Subject: [PATCH] use new session subscriptions data --- src/Simplex/Messaging/Agent/Client.hs | 124 ++++++-------------- src/Simplex/Messaging/Agent/TSessionSubs.hs | 12 ++ tests/Test.hs | 4 +- tests/Util.hs | 4 +- 4 files changed, 53 insertions(+), 91 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1a5fe56da..dc0f4ca0d 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -202,6 +202,7 @@ import qualified Data.ByteString.Base64 as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Composition ((.:)) +import Data.Containers.ListUtils (nubOrd) import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) @@ -218,7 +219,6 @@ import Data.Text.Encoding import Data.Time (UTCTime, addUTCTime, defaultTimeLocale, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) -import GHC.Conc (unsafeIOToSTM) import Network.Socket (HostName) import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X @@ -238,8 +238,6 @@ import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction) import qualified Simplex.Messaging.Agent.Store.DB as DB import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs) import qualified Simplex.Messaging.Agent.TSessionSubs as SS -import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues) -import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import Simplex.Messaging.Client import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding @@ -339,8 +337,8 @@ data AgentClient = AgentClient userNetworkInfo :: TVar UserNetworkInfo, userNetworkUpdated :: TVar (Maybe UTCTime), subscrConns :: TVar (Set ConnId), - activeSubs :: TRcvQueues (SessionId, RcvQueueSub), - pendingSubs :: TRcvQueues RcvQueueSub, + -- activeSubs :: TRcvQueues (SessionId, RcvQueueSub), + -- pendingSubs :: TRcvQueues RcvQueueSub, currentSubs :: TSessionSubs, removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError, workerSeq :: TVar Int, @@ -508,8 +506,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai userNetworkInfo <- newTVarIO $ UserNetworkInfo UNOther True userNetworkUpdated <- newTVarIO Nothing subscrConns <- newTVarIO S.empty - activeSubs <- RQ.empty - pendingSubs <- RQ.empty currentSubs <- SS.emptyIO removedSubs <- TM.emptyIO workerSeq <- newTVarIO 0 @@ -548,8 +544,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai userNetworkInfo, userNetworkUpdated, subscrConns, - activeSubs, - pendingSubs, + -- activeSubs, + -- pendingSubs, currentSubs, removedSubs, workerSeq, @@ -718,26 +714,20 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess -- because we can have a race condition when a new current client could have already -- made subscriptions active, and the old client would be processing diconnection later. removeClientAndSubs :: IO ([RcvQueueSub], [ConnId]) - removeClientAndSubs = do - (qs, cs, subs) <- atomically $ do - removeSessVar v tSess smpClients - ifM (readTVar active) removeSubs (pure ([], [], M.empty)) - -- TODO [subs] remove logs - when (M.keysSet subs /= S.fromList (map queueId qs)) $ error $ "setSubsPending different queues: " <> show (S.size $ M.keysSet subs) <> " " <> show (S.size $ S.fromList $ map queueId qs) - when (S.fromList (map qConnId $ M.elems subs) /= S.fromList (map qConnId qs)) $ error $ "setSubsPending different connections: " <> show (S.size (S.fromList $ map qConnId $ M.elems subs)) <> " " <> show (S.size $ S.fromList $ map qConnId qs) - pure (qs, cs) + removeClientAndSubs = atomically $ do + removeSessVar v tSess smpClients + ifM (readTVar active) removeSubs (pure ([], [])) where sessId = sessionId $ thParams client removeSubs = do - (qs, cs) <- RQ.getDelSessQueues tSess sessId $ activeSubs c - RQ.batchAddQueues qs $ pendingSubs c - -- TODO [subs] mode <- getSessionMode c subs <- SS.setSubsPending mode tSess sessId $ currentSubs c + let qs = M.elems subs + cs = nubOrd $ map qConnId qs -- this removes proxied relays that this client created sessions to destSrvs <- M.keys <$> readTVar prs forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays - pure (qs, cs, subs) + pure (qs, cs) serverDown :: ([RcvQueueSub], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do @@ -758,30 +748,20 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do where getWorkerVar ts = ifM - (not <$> hasQueues) + (not <$> SS.hasPendingSubs tSess (currentSubs c)) (pure Nothing) -- prevent race with cleanup and adding pending queues in another call (Just <$> getSessVar workerSeq tSess smpSubWorkers ts) - where - hasQueues = do - -- TODO [subs] - yes <- RQ.hasSessQueues tSess (pendingSubs c) - yes' <- SS.hasPendingSubs tSess $ currentSubs c - when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSubs different result" - pure yes newSubWorker v = do a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v) atomically $ putTMVar (sessionVar v) a runSubWorker = do ri <- asks $ reconnectInterval . config withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do - pending <- liftIO $ RQ.getSessQueues tSess $ pendingSubs c - -- TODO [subs] - subs <- atomically $ SS.getPendingSubs tSess $ currentSubs c - when (M.keysSet subs /= S.fromList (map queueId pending)) $ error "getPendingSubs different queues" - unless (null pending) $ do + pending <- atomically $ SS.getPendingSubs tSess $ currentSubs c + unless (M.null pending) $ do liftIO $ waitUntilForeground c liftIO $ waitForUserNetwork c - handleNotify $ resubscribeSessQueues c tSess pending + handleNotify $ resubscribeSessQueues c tSess $ M.elems pending loop isForeground = (ASForeground ==) <$> readTVar (agentState c) cleanup :: SessionVar (Async ()) -> STM () @@ -937,8 +917,6 @@ closeAgentClient c = do atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst) clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker - atomically $ RQ.clear $ activeSubs c - atomically $ RQ.clear $ pendingSubs c atomically $ SS.clear $ currentSubs c clear subscrConns clear getMsgLocks @@ -1528,10 +1506,7 @@ subscribeQueues c qs withEvents = do when (withEvents && not (null errs)) $ notifySub c "" $ ERRS $ map (first qConnId) errs pure $ map (second Left) errs <> concatMap L.toList rs where - addPendingSubs (tSess, qs') = atomically $ do - let qs'' = L.toList qs' - RQ.batchAddQueues qs'' $ pendingSubs c - SS.batchAddPendingSubs qs'' tSess $ currentSubs c + addPendingSubs (tSess, qs') = atomically $ SS.batchAddPendingSubs (L.toList qs') tSess $ currentSubs c subscribeQueues_ qs'@(tSess@(_, srv, _), _) = do (rs, active) <- subscribeSessQueues_ c qs' withEvents if active @@ -1572,12 +1547,7 @@ subscribeSessQueues_ c qs withEvents = sendClientBatch_ "SUB" False subscribeQue rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs' cs_ <- if withEvents - then do - cs <- liftIO $ RQ.getSessConns tSess $ activeSubs c - -- TODO [subs] - subs <- atomically $ SS.getActiveSubs tSess $ currentSubs c - when (S.fromList (map qConnId $ M.elems subs) /= cs) $ error "getActiveSubs different connections" - pure $ Just cs + then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c) else pure Nothing active <- atomically $ @@ -1650,25 +1620,18 @@ sendBatch smpCmdFunc smp nm qs = L.zip qs <$> smpCmdFunc smp nm (L.map queueCred addSubscription :: AgentClient -> SessionId -> RcvQueueSub -> STM () addSubscription c sessId rq = do modifyTVar' (subscrConns c) $ S.insert $ qConnId rq - RQ.addSessQueue (sessId, rq) $ activeSubs c - RQ.deleteQueue rq $ pendingSubs c - -- TODO [subs] tSess <- mkSMPTransportSession c rq SS.addActiveSub sessId rq tSess $ currentSubs c failSubscription :: SomeRcvQueue q => AgentClient -> q -> SMPClientError -> STM () failSubscription c rq e = do - RQ.deleteQueue rq (pendingSubs c) - TM.insert (RQ.qKey rq) e (removedSubs c) - -- TODO [subs] + TM.insert (qUserId rq, qServer rq, queueId rq) e (removedSubs c) tSess <- mkSMPTransportSession c rq SS.deletePendingSub (queueId rq) tSess $ currentSubs c addPendingSubscription :: AgentClient -> RcvQueueSub -> STM () addPendingSubscription c rq = do modifyTVar' (subscrConns c) $ S.insert $ qConnId rq - RQ.addQueue rq $ pendingSubs c - -- TODO [subs] tSess <- mkSMPTransportSession c rq SS.addPendingSub rq tSess $ currentSubs c @@ -1685,39 +1648,25 @@ addNewQueueSubscription c rq' tSess sessId = do hasActiveSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool hasActiveSubscription c rq = do - yes <- RQ.hasQueue rq $ activeSubs c - -- TODO [subs] tSess <- mkSMPTransportSession c rq - yes' <- SS.hasActiveSub (queueId rq) tSess $ currentSubs c - when (yes /= yes') $ unsafeIOToSTM $ error "hasActiveSub different result" - pure yes + SS.hasActiveSub (queueId rq) tSess $ currentSubs c {-# INLINE hasActiveSubscription #-} hasPendingSubscription :: SomeRcvQueue q => AgentClient -> q -> STM Bool hasPendingSubscription c rq = do - yes <- RQ.hasQueue rq $ pendingSubs c - -- TODO [subs] tSess <- mkSMPTransportSession c rq - yes' <- SS.hasPendingSub (queueId rq) tSess $ currentSubs c - when (yes /= yes') $ unsafeIOToSTM $ error "hasPendingSub different result" - pure yes + SS.hasPendingSub (queueId rq) tSess $ currentSubs c {-# INLINE hasPendingSubscription #-} removeSubscription :: SomeRcvQueue q => AgentClient -> ConnId -> q -> STM () removeSubscription c connId rq = do modifyTVar' (subscrConns c) $ S.delete connId - RQ.deleteQueue rq $ activeSubs c - RQ.deleteQueue rq $ pendingSubs c - -- TODO [subs] tSess <- mkSMPTransportSession c rq SS.deleteSub (queueId rq) tSess $ currentSubs c removeSubscriptions :: SomeRcvQueue q => AgentClient -> [ConnId] -> [q] -> STM () removeSubscriptions c connIds rqs = do unless (null connIds) $ modifyTVar' (subscrConns c) (`S.difference` (S.fromList connIds)) - RQ.batchDeleteQueues rqs $ activeSubs c - RQ.batchDeleteQueues rqs $ pendingSubs c - -- TODO [subs] batch forM_ rqs $ \rq -> do tSess <- mkSMPTransportSession c rq SS.deleteSub (queueId rq) tSess $ currentSubs c @@ -2424,15 +2373,16 @@ data ServerSessions = ServerSessions getAgentSubsTotal :: AgentClient -> [UserId] -> IO (SMPServerSubs, Bool) getAgentSubsTotal c userIds = do - ssActive <- getSubsCount activeSubs - ssPending <- getSubsCount pendingSubs + (ssActive, ssPending) <- SS.foldSessionSubs addSub (0, 0) $ currentSubs c sess <- hasSession . M.toList =<< readTVarIO (smpClients c) pure (SMPServerSubs {ssActive, ssPending}, sess) where - getSubsCount :: (AgentClient -> TRcvQueues q) -> IO Int - getSubsCount subs = M.foldrWithKey' addSub 0 <$> readTVarIO (RQ.getRcvQueues $ subs c) - addSub :: (UserId, SMPServer, SMP.RecipientId) -> q -> Int -> Int - addSub (userId, _, _) _ cnt = if userId `elem` userIds then cnt + 1 else cnt + addSub :: (Int, Int) -> (SMPTransportSession, SS.SessSubs) -> IO (Int, Int) + addSub acc@(!ssActive, !ssPending) ((userId, _, _), s) + | userId `elem` userIds = do + (active, pending) <- SS.mapSubs M.size s + pure (ssActive + active, ssPending + pending) + | otherwise = pure acc hasSession :: [(SMPTransportSession, SMPClientVar)] -> IO Bool hasSession = \case [] -> pure False @@ -2469,13 +2419,12 @@ getAgentServersSummary c@AgentClient {smpServersStats, xftpServersStats, ntfServ ntfServersSessions } where - getServerSubs = do - subs <- M.foldrWithKey' (addSub incActive) M.empty <$> readTVarIO (RQ.getRcvQueues $ activeSubs c) - M.foldrWithKey' (addSub incPending) subs <$> readTVarIO (RQ.getRcvQueues $ pendingSubs c) + getServerSubs = SS.foldSessionSubs addSub M.empty $ currentSubs c where - addSub f (userId, srv, _) _ = M.alter (Just . f . fromMaybe SMPServerSubs {ssActive = 0, ssPending = 0}) (userId, srv) - incActive ss = ss {ssActive = ssActive ss + 1} - incPending ss = ss {ssPending = ssPending ss + 1} + addSub subs ((userId, srv, _), s) = do + (active, pending) <- SS.mapSubs M.size s + let add ss = ss {ssActive = ssActive ss + active, ssPending = ssPending ss + pending} + pure $ M.alter (Just . add . fromMaybe (SMPServerSubs 0 0)) (userId, srv) subs Env {xftpAgent = XFTPAgent {xftpRcvWorkers, xftpSndWorkers, xftpDelWorkers}} = agentEnv getXFTPWorkerSrvs workers = foldM addSrv [] . M.toList =<< readTVarIO workers where @@ -2507,13 +2456,14 @@ data SubscriptionsInfo = SubscriptionsInfo getAgentSubscriptions :: AgentClient -> IO SubscriptionsInfo getAgentSubscriptions c = do - activeSubscriptions <- getSubs activeSubs - pendingSubscriptions <- getSubs pendingSubs + (activeSubscriptions, pendingSubscriptions) <- SS.foldSessionSubs addSubs ([], []) $ currentSubs c removedSubscriptions <- getRemovedSubs pure $ SubscriptionsInfo {activeSubscriptions, pendingSubscriptions, removedSubscriptions} where - getSubs :: (AgentClient -> TRcvQueues q) -> IO [SubInfo] - getSubs sel = map (`subInfo` Nothing) . M.keys <$> readTVarIO (RQ.getRcvQueues $ sel c) + addSubs :: ([SubInfo], [SubInfo]) -> (SMPTransportSession, SS.SessSubs) -> IO ([SubInfo], [SubInfo]) + addSubs (active, pending) ((userId, srv, _), s) = do + (active', pending') <- SS.mapSubs (map (\rId -> subInfo (userId, srv, rId) Nothing) . M.keys) s + pure (active' ++ active, pending' ++ pending) getRemovedSubs = map (uncurry subInfo . second Just) . M.assocs <$> readTVarIO (removedSubs c) subInfo :: (UserId, SMPServer, SMP.RecipientId) -> Maybe SMPClientError -> SubInfo subInfo (uId, srv, rId) err = SubInfo {userId = uId, server = enc srv, rcvId = enc rId, subError = show <$> err} diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs index 7d3028dfa..8c0cd05de 100644 --- a/src/Simplex/Messaging/Agent/TSessionSubs.hs +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -4,6 +4,7 @@ module Simplex.Messaging.Agent.TSessionSubs ( TSessionSubs (sessionSubs), + SessSubs (..), emptyIO, clear, hasActiveSub, @@ -19,6 +20,8 @@ module Simplex.Messaging.Agent.TSessionSubs getPendingSubs, getActiveSubs, setSubsPending, + foldSessionSubs, + mapSubs, ) where @@ -162,3 +165,12 @@ setSubsPending_ s sessId_ = do writeTVar as M.empty modifyTVar' (pendingSubs s) $ M.union subs pure subs + +foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a +foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs + +mapSubs :: (Map RecipientId RcvQueueSub -> a) -> SessSubs -> IO (a, a) +mapSubs f s = do + active <- readTVarIO $ activeSubs s + pending <- readTVarIO $ pendingSubs s + pure (f active, f pending) diff --git a/tests/Test.hs b/tests/Test.hs index 1339904a5..7296c1ff3 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -134,13 +134,13 @@ main = do ntfServerTests (transport @TLS, ASType SQSPostgres SMSPostgres) around_ (postgressBracket testServerDBConnectInfo) $ do xdescribe "SMP client agent, postgres+jornal message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSJournal) - fdescribe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres) + describe "SMP client agent, postgres-only message store" $ agentTests (transport @TLS, ASType SQSPostgres SMSPostgres) xdescribe "SMP proxy, postgres+jornal message store" $ before (pure $ ASType SQSPostgres SMSJournal) smpProxyTests describe "SMP proxy, postgres-only message store" $ before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests #endif - xdescribe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) + describe "SMP client agent, jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal) describe "SMP proxy, jornal message store" $ before (pure $ ASType SQSMemory SMSJournal) smpProxyTests describe "XFTP" $ do diff --git a/tests/Util.hs b/tests/Util.hs index 6e5a3f48a..9a4049d68 100644 --- a/tests/Util.hs +++ b/tests/Util.hs @@ -48,7 +48,7 @@ newtype TestWrapper a = TestWrapper a -- TODO [ntfdb] running wiht LogWarn level shows potential issue "Queue count differs" testLogLevel :: LogLevel -testLogLevel = LogWarn +testLogLevel = LogError instance Example a => Example (TestWrapper a) where type Arg (TestWrapper a) = Arg a @@ -56,7 +56,7 @@ instance Example a => Example (TestWrapper a) where ci <- envCI runTest `E.catches` [E.Handler (onTestFailure ci), E.Handler (onTestException ci)] where - tt = 30 + tt = 120 runTest = timeout (tt * 1000000) (evaluateExample action params hooks state) `finally` callCommand "sync" >>= \case Just r -> pure r