From f229e135e33ec5ee1f6d0978bd903d84b0b60efa Mon Sep 17 00:00:00 2001 From: Evgeny Date: Fri, 16 Aug 2024 10:06:22 +0100 Subject: [PATCH] agent: reduce subscription transactions (#1259) * agent: reduce subscription transactions * nub * remove commented --- src/Simplex/Messaging/Agent.hs | 14 ++++++-------- src/Simplex/Messaging/Agent/Client.hs | 4 ---- src/Simplex/Messaging/Agent/Store/SQLite.hs | 4 ++++ 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 672375aaf..17d11246c 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -131,6 +131,7 @@ import Data.Bifunctor (bimap, first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Composition ((.:), (.:.), (.::), (.::.)) +import Data.Containers.ListUtils (nubOrd) import Data.Either (isRight, rights) import Data.Foldable (foldl', toList) import Data.Functor (($>)) @@ -959,7 +960,7 @@ subscribeConnections' c connIds = do errs' = M.map (Left . storeError) errs (subRs, rcvQs) = M.mapEither rcvQueueOrResult cs mapM_ (mapM_ (\(cData, sqs) -> mapM_ (lift . resumeMsgDelivery c cData) sqs) . sndQueue) cs - mapM_ (resumeConnCmds c) $ M.keys cs + lift $ resumeConnCmds c $ M.keys cs rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs) ns <- asks ntfSupervisor tkn <- readTVarIO (ntfTkn ns) @@ -1118,13 +1119,10 @@ resumeSrvCmds :: AgentClient -> Maybe SMPServer -> AM' () resumeSrvCmds = void .: getAsyncCmdWorker False {-# INLINE resumeSrvCmds #-} -resumeConnCmds :: AgentClient -> ConnId -> AM () -resumeConnCmds c connId = - unlessM connQueued $ - withStore' c (`getPendingCommandServers` connId) - >>= mapM_ (lift . resumeSrvCmds c) - where - connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connCmdsQueued c) +resumeConnCmds :: AgentClient -> [ConnId] -> AM' () +resumeConnCmds c connIds = do + srvs <- nubOrd . concat . rights <$> withStoreBatch' c (\db -> fmap (getPendingCommandServers db) connIds) + mapM_ (resumeSrvCmds c) srvs getAsyncCmdWorker :: Bool -> AgentClient -> Maybe SMPServer -> AM' Worker getAsyncCmdWorker hasWork c server = diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 23f0a98d1..02b31cb95 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -313,7 +313,6 @@ data AgentClient = AgentClient workerSeq :: TVar Int, smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()), asyncCmdWorkers :: TMap (Maybe SMPServer) Worker, - connCmdsQueued :: TMap ConnId Bool, ntfNetworkOp :: TVar AgentOpState, rcvNetworkOp :: TVar AgentOpState, msgDeliveryOp :: TVar AgentOpState, @@ -480,7 +479,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs a workerSeq <- newTVarIO 0 smpDeliveryWorkers <- TM.emptyIO asyncCmdWorkers <- TM.emptyIO - connCmdsQueued <- TM.emptyIO ntfNetworkOp <- newTVarIO $ AgentOpState False 0 rcvNetworkOp <- newTVarIO $ AgentOpState False 0 msgDeliveryOp <- newTVarIO $ AgentOpState False 0 @@ -519,7 +517,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs a workerSeq, smpDeliveryWorkers, asyncCmdWorkers, - connCmdsQueued, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, @@ -893,7 +890,6 @@ closeAgentClient c = do atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst) clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker - clear connCmdsQueued atomically . RQ.clear $ activeSubs c atomically . RQ.clear $ pendingSubs c clear subscrConns diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 20f382d40..e0e4fc58f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -1909,9 +1909,11 @@ newQueueId_ (Only maxId : _) = DBQueueId (maxId + 1) getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn) getConn = getAnyConn False +{-# INLINE getConn #-} getDeletedConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn) getDeletedConn = getAnyConn True +{-# INLINE getDeletedConn #-} getAnyConn :: Bool -> DB.Connection -> ConnId -> IO (Either StoreError SomeConn) getAnyConn deleted' dbConn connId = @@ -1932,9 +1934,11 @@ getAnyConn deleted' dbConn connId = getConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn] getConns = getAnyConns_ False +{-# INLINE getConns #-} getDeletedConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn] getDeletedConns = getAnyConns_ True +{-# INLINE getDeletedConns #-} getAnyConns_ :: Bool -> DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn] getAnyConns_ deleted' db connIds = forM connIds $ E.handle handleDBError . getAnyConn deleted' db