agent: reduce subscription transactions (#1259)

* agent: reduce subscription transactions

* nub

* remove commented
This commit is contained in:
Evgeny
2024-08-16 10:06:22 +01:00
committed by GitHub
parent 0dd52dc69f
commit f229e135e3
3 changed files with 10 additions and 12 deletions

View File

@@ -131,6 +131,7 @@ import Data.Bifunctor (bimap, first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Composition ((.:), (.:.), (.::), (.::.))
import Data.Containers.ListUtils (nubOrd)
import Data.Either (isRight, rights)
import Data.Foldable (foldl', toList)
import Data.Functor (($>))
@@ -959,7 +960,7 @@ subscribeConnections' c connIds = do
errs' = M.map (Left . storeError) errs
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
mapM_ (mapM_ (\(cData, sqs) -> mapM_ (lift . resumeMsgDelivery c cData) sqs) . sndQueue) cs
mapM_ (resumeConnCmds c) $ M.keys cs
lift $ resumeConnCmds c $ M.keys cs
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs)
ns <- asks ntfSupervisor
tkn <- readTVarIO (ntfTkn ns)
@@ -1118,13 +1119,10 @@ resumeSrvCmds :: AgentClient -> Maybe SMPServer -> AM' ()
resumeSrvCmds = void .: getAsyncCmdWorker False
{-# INLINE resumeSrvCmds #-}
resumeConnCmds :: AgentClient -> ConnId -> AM ()
resumeConnCmds c connId =
unlessM connQueued $
withStore' c (`getPendingCommandServers` connId)
>>= mapM_ (lift . resumeSrvCmds c)
where
connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connCmdsQueued c)
resumeConnCmds :: AgentClient -> [ConnId] -> AM' ()
resumeConnCmds c connIds = do
srvs <- nubOrd . concat . rights <$> withStoreBatch' c (\db -> fmap (getPendingCommandServers db) connIds)
mapM_ (resumeSrvCmds c) srvs
getAsyncCmdWorker :: Bool -> AgentClient -> Maybe SMPServer -> AM' Worker
getAsyncCmdWorker hasWork c server =

View File

@@ -313,7 +313,6 @@ data AgentClient = AgentClient
workerSeq :: TVar Int,
smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()),
asyncCmdWorkers :: TMap (Maybe SMPServer) Worker,
connCmdsQueued :: TMap ConnId Bool,
ntfNetworkOp :: TVar AgentOpState,
rcvNetworkOp :: TVar AgentOpState,
msgDeliveryOp :: TVar AgentOpState,
@@ -480,7 +479,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs a
workerSeq <- newTVarIO 0
smpDeliveryWorkers <- TM.emptyIO
asyncCmdWorkers <- TM.emptyIO
connCmdsQueued <- TM.emptyIO
ntfNetworkOp <- newTVarIO $ AgentOpState False 0
rcvNetworkOp <- newTVarIO $ AgentOpState False 0
msgDeliveryOp <- newTVarIO $ AgentOpState False 0
@@ -519,7 +517,6 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs a
workerSeq,
smpDeliveryWorkers,
asyncCmdWorkers,
connCmdsQueued,
ntfNetworkOp,
rcvNetworkOp,
msgDeliveryOp,
@@ -893,7 +890,6 @@ closeAgentClient c = do
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
clear connCmdsQueued
atomically . RQ.clear $ activeSubs c
atomically . RQ.clear $ pendingSubs c
clear subscrConns

View File

@@ -1909,9 +1909,11 @@ newQueueId_ (Only maxId : _) = DBQueueId (maxId + 1)
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
getConn = getAnyConn False
{-# INLINE getConn #-}
getDeletedConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
getDeletedConn = getAnyConn True
{-# INLINE getDeletedConn #-}
getAnyConn :: Bool -> DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
getAnyConn deleted' dbConn connId =
@@ -1932,9 +1934,11 @@ getAnyConn deleted' dbConn connId =
getConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
getConns = getAnyConns_ False
{-# INLINE getConns #-}
getDeletedConns :: DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
getDeletedConns = getAnyConns_ True
{-# INLINE getDeletedConns #-}
getAnyConns_ :: Bool -> DB.Connection -> [ConnId] -> IO [Either StoreError SomeConn]
getAnyConns_ deleted' db connIds = forM connIds $ E.handle handleDBError . getAnyConn deleted' db