mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 20:44:49 +00:00
optimze, refactor
This commit is contained in:
@@ -163,7 +163,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
liftIO $ exitSuccess
|
||||
raceAny_
|
||||
( serverThread "server subscribers" s subscribers subscriptions cancelSub
|
||||
: serverThread "server notifiers" s ntfSubscribers ntfSubscriptions (\_ -> pure ())
|
||||
: serverThread "server ntfSubscribers" s ntfSubscribers ntfSubscriptions (\_ -> pure ())
|
||||
: deliverNtfsThread s
|
||||
: sendPendingEvtsThread s
|
||||
: receiveFromProxyAgent pa
|
||||
@@ -775,24 +775,20 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt
|
||||
hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt
|
||||
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
|
||||
putActiveClientsInfo "SMP" (queueSubscribers subscribers) False
|
||||
putActiveClientsInfo "Ntf" (queueSubscribers ntfSubscribers) True
|
||||
putSubscribedClients "SMP" (subClients subscribers) False
|
||||
putSubscribedClients "Ntf" (subClients ntfSubscribers) True
|
||||
putSubscribersInfo "SMP" subscribers False
|
||||
putSubscribersInfo "Ntf" ntfSubscribers True
|
||||
where
|
||||
putActiveClientsInfo :: String -> SubscribedClients -> Bool -> IO ()
|
||||
putActiveClientsInfo protoName clients showIds = do
|
||||
activeSubs <- getSubscribedClients clients
|
||||
putSubscribersInfo :: String -> ServerSubscribers -> Bool -> IO ()
|
||||
putSubscribersInfo protoName ServerSubscribers {queueSubscribers, subClients} showIds = do
|
||||
activeSubs <- getSubscribedClients queueSubscribers
|
||||
hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs)
|
||||
clnts <- countSubClients activeSubs
|
||||
hPutStrLn h $ protoName <> " subscribed clients: " <> show (IS.size clnts) <> (if showIds then " " <> show (IS.toList clnts) else "")
|
||||
clnts' <- readTVarIO subClients
|
||||
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IS.size clnts') <> (if showIds then " " <> show clnts' else "")
|
||||
where
|
||||
countSubClients :: M.Map QueueId (TVar (Maybe AClient)) -> IO IS.IntSet
|
||||
countSubClients = foldM (\ !s c -> maybe s ((`IS.insert` s) . clientId') <$> readTVarIO c) IS.empty
|
||||
putSubscribedClients :: String -> TVar IS.IntSet -> Bool -> IO ()
|
||||
putSubscribedClients protoName subClnts showIds = do
|
||||
clnts <- readTVarIO subClnts
|
||||
hPutStrLn h $ protoName <> " subscribed clients count 2: " <> show (IS.size clnts) <> (if showIds then " " <> show clnts else "")
|
||||
countClientSubs :: (forall s. Client s -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap AClient -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
|
||||
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
|
||||
where
|
||||
@@ -1375,7 +1371,7 @@ client
|
||||
|
||||
subscribeQueue :: StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)
|
||||
subscribeQueue q qr =
|
||||
atomically (TM.lookup rId subscriptions) >>= \case
|
||||
liftIO (TM.lookupIO rId subscriptions) >>= \case
|
||||
Nothing -> newSub >>= deliver True
|
||||
Just s@Sub {subThread} -> do
|
||||
stats <- asks serverStats
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.TMap
|
||||
( TMap,
|
||||
emptyIO,
|
||||
@@ -11,7 +13,6 @@ module Simplex.Messaging.TMap
|
||||
insert,
|
||||
insertM,
|
||||
delete,
|
||||
lookupInsert,
|
||||
lookupDelete,
|
||||
adjust,
|
||||
update,
|
||||
@@ -71,12 +72,8 @@ delete :: Ord k => k -> TMap k a -> STM ()
|
||||
delete k m = modifyTVar' m $ M.delete k
|
||||
{-# INLINE delete #-}
|
||||
|
||||
lookupInsert :: Ord k => k -> a -> TMap k a -> STM (Maybe a)
|
||||
lookupInsert k v m = stateTVar m $ \mv -> (M.lookup k mv, M.insert k v mv)
|
||||
{-# INLINE lookupInsert #-}
|
||||
|
||||
lookupDelete :: Ord k => k -> TMap k a -> STM (Maybe a)
|
||||
lookupDelete k m = stateTVar m $ \mv -> (M.lookup k mv, M.delete k mv)
|
||||
lookupDelete k m = stateTVar m $ M.alterF (,Nothing) k
|
||||
{-# INLINE lookupDelete #-}
|
||||
|
||||
adjust :: Ord k => (a -> a) -> k -> TMap k a -> STM ()
|
||||
|
||||
Reference in New Issue
Block a user