mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-14 16:06:16 +00:00
smp server: stats for commands retrieving messages when notifications are received (#1236)
* more stats * fix stats
This commit is contained in:
committed by
GitHub
parent
7565ddd91c
commit
cc5732f41f
@@ -69,6 +69,7 @@ import Data.Type.Equality
|
||||
import GHC.Stats (getRTSStats)
|
||||
import GHC.TypeLits (KnownNat)
|
||||
import Network.Socket (ServiceName, Socket, socketToHandle)
|
||||
import Numeric.Natural (Natural)
|
||||
import Simplex.Messaging.Agent.Lock
|
||||
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, forwardSMPTransmission, smpProxyError, temporaryClientError)
|
||||
import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient, getConnectedSMPServerClient)
|
||||
@@ -395,27 +396,28 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
|
||||
CPStats -> withUserRole $ do
|
||||
ss <- unliftIO u $ asks serverStats
|
||||
let putStat :: Show a => ByteString -> (ServerStats -> TVar a) -> IO ()
|
||||
putStat label var = readTVarIO (var ss) >>= \v -> B.hPutStr h $ label <> ": " <> bshow v <> "\n"
|
||||
putProxyStat :: ByteString -> (ServerStats -> ProxyStats) -> IO ()
|
||||
let getStat :: (ServerStats -> TVar a) -> IO a
|
||||
getStat var = readTVarIO (var ss)
|
||||
putStat :: Show a => String -> (ServerStats -> TVar a) -> IO ()
|
||||
putStat label var = getStat var >>= \v -> hPutStrLn h $ label <> ": " <> show v
|
||||
putProxyStat :: String -> (ServerStats -> ProxyStats) -> IO ()
|
||||
putProxyStat label var = do
|
||||
ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} <- atomically $ getProxyStatsData $ var ss
|
||||
B.hPutStr h $ label <> ": requests=" <> bshow _pRequests <> ", successes=" <> bshow _pSuccesses <> ", errorsConnect=" <> bshow _pErrorsConnect <> ", errorsCompat=" <> bshow _pErrorsCompat <> ", errorsOther=" <> bshow _pErrorsOther <> "\n"
|
||||
hPutStrLn h $ label <> ": requests=" <> show _pRequests <> ", successes=" <> show _pSuccesses <> ", errorsConnect=" <> show _pErrorsConnect <> ", errorsCompat=" <> show _pErrorsCompat <> ", errorsOther=" <> show _pErrorsOther
|
||||
putStat "fromTime" fromTime
|
||||
putStat "qCreated" qCreated
|
||||
putStat "qSecured" qSecured
|
||||
putStat "qDeletedAll" qDeletedAll
|
||||
putStat "qDeletedNew" qDeletedNew
|
||||
putStat "qDeletedSecured" qDeletedSecured
|
||||
readTVarIO (day $ activeQueues ss) >>= \v -> B.hPutStr h $ "dayMsgQueues" <> ": " <> bshow (S.size v) <> "\n"
|
||||
getStat (day . activeQueues) >>= \v -> hPutStrLn h $ "dayMsgQueues: " <> show (S.size v)
|
||||
subs <- (,,,,) <$> getStat qSub <*> getStat qSubNoMsg <*> getStat qSubAuth <*> getStat qSubDuplicate <*> getStat qSubProhibited
|
||||
hPutStrLn h $ "SUBs (count, noMsg, auth, duplicate, prohibited): " <> show subs
|
||||
putStat "msgSent" msgSent
|
||||
putStat "msgRecv" msgRecv
|
||||
putStat "msgRecvGet" msgRecvGet
|
||||
putStat "msgGet" msgGet
|
||||
putStat "msgGetNoMsg" msgGetNoMsg
|
||||
putStat "msgGetAuth" msgGetAuth
|
||||
putStat "msgGetDuplicate" msgGetDuplicate
|
||||
putStat "msgGetProhibited" msgGetProhibited
|
||||
gets <- (,,,,) <$> getStat msgGet <*> getStat msgGetNoMsg <*> getStat msgGetAuth <*> getStat msgGetDuplicate <*> getStat msgGetProhibited
|
||||
hPutStrLn h $ "GETs (count, noMsg, auth, duplicate, prohibited): " <> show gets
|
||||
putStat "msgSentNtf" msgSentNtf
|
||||
putStat "msgRecvNtf" msgRecvNtf
|
||||
putStat "qCount" qCount
|
||||
@@ -474,30 +476,53 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
activeClients <- readTVarIO clients
|
||||
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
|
||||
when (r == CPRAdmin) $ do
|
||||
(smpSubCnt, smpSubCntByGroup, smpClCnt) <- countClientSubs subscriptions countSMPSubs activeClients
|
||||
(ntfSubCnt, _, ntfClCnt) <- countClientSubs ntfSubscriptions (\_ -> pure (0, 0, 0, 0)) activeClients
|
||||
hPutStrLn h $ "SMP subscriptions (via clients, slow): " <> show smpSubCnt
|
||||
clQs <- clientTBQueueLengths activeClients
|
||||
hPutStrLn h $ "Client queues (rcvQ, sndQ, msgQ): " <> show clQs
|
||||
(smpSubCnt, smpSubCntByGroup, smpClCnt, smpClQs) <- countClientSubs subscriptions (Just countSMPSubs) activeClients
|
||||
hPutStrLn h $ "SMP subscriptions (via clients): " <> show smpSubCnt
|
||||
hPutStrLn h $ "SMP subscriptions (by group: NoSub, SubPending, SubThread, ProhibitSub): " <> show smpSubCntByGroup
|
||||
hPutStrLn h $ "SMP subscribed clients (via clients, slow): " <> show smpClCnt
|
||||
hPutStrLn h $ "Ntf subscriptions (via clients, slow): " <> show ntfSubCnt
|
||||
hPutStrLn h $ "Ntf subscribed clients (via clients, slow): " <> show ntfClCnt
|
||||
activeSubs <- readTVarIO subscribers
|
||||
activeNtfSubs <- readTVarIO notifiers
|
||||
hPutStrLn h $ "SMP subscriptions: " <> show (M.size activeSubs)
|
||||
hPutStrLn h $ "SMP subscribed clients: " <> show (countSubClients activeSubs)
|
||||
hPutStrLn h $ "Ntf subscriptions: " <> show (M.size activeNtfSubs)
|
||||
hPutStrLn h $ "Ntf subscribed clients: " <> show (countSubClients activeNtfSubs)
|
||||
hPutStrLn h $ "SMP subscribed clients (via clients): " <> show smpClCnt
|
||||
hPutStrLn h $ "SMP subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show smpClQs
|
||||
(ntfSubCnt, _, ntfClCnt, ntfClQs) <- countClientSubs ntfSubscriptions Nothing activeClients
|
||||
hPutStrLn h $ "Ntf subscriptions (via clients): " <> show ntfSubCnt
|
||||
hPutStrLn h $ "Ntf subscribed clients (via clients): " <> show ntfClCnt
|
||||
hPutStrLn h $ "Ntf subscribed clients queues (via clients, rcvQ, sndQ, msgQ): " <> show ntfClQs
|
||||
putActiveClientsInfo "SMP" subscribers
|
||||
putActiveClientsInfo "Ntf" notifiers
|
||||
where
|
||||
countClientSubs :: (Client -> TMap QueueId a) -> (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap Client -> IO (Int, (Int, Int, Int, Int), Int)
|
||||
countClientSubs subSel countSubs = foldM addSubs (0, (0, 0, 0, 0), 0)
|
||||
putActiveClientsInfo :: String -> TMap QueueId Client -> IO ()
|
||||
putActiveClientsInfo protoName clients = do
|
||||
activeSubs <- readTVarIO clients
|
||||
hPutStrLn h $ protoName <> " subscriptions: " <> show (M.size activeSubs)
|
||||
hPutStrLn h $ protoName <> " subscribed clients: " <> show (countSubClients activeSubs)
|
||||
when (r == CPRAdmin) $ do
|
||||
clQs <- clientTBQueueLengths activeSubs
|
||||
hPutStrLn h $ protoName <> " subscribed clients queues (rcvQ, sndQ, msgQ): " <> show clQs
|
||||
countClientSubs :: (Client -> TMap QueueId a) -> Maybe (M.Map QueueId a -> IO (Int, Int, Int, Int)) -> IM.IntMap Client -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
|
||||
countClientSubs subSel countSubs_ = foldM addSubs (0, (0, 0, 0, 0), 0, (0, 0, 0))
|
||||
where
|
||||
addSubs :: (Int, (Int, Int, Int, Int), Int) -> Client -> IO (Int, (Int, Int, Int, Int), Int)
|
||||
addSubs (subCnt, (c1, c2, c3, c4), clCnt) cl = do
|
||||
addSubs :: (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural)) -> Client -> IO (Int, (Int, Int, Int, Int), Int, (Natural, Natural, Natural))
|
||||
addSubs (!subCnt, cnts@(!c1, !c2, !c3, !c4), !clCnt, !qs) cl = do
|
||||
subs <- readTVarIO $ subSel cl
|
||||
(c1', c2', c3', c4') <- countSubs subs
|
||||
cnts' <- case countSubs_ of
|
||||
Nothing -> pure cnts
|
||||
Just countSubs -> do
|
||||
(c1', c2', c3', c4') <- countSubs subs
|
||||
pure (c1 + c1', c2 + c2', c3 + c3', c4 + c4')
|
||||
let cnt = M.size subs
|
||||
cnts' = (c1 + c1', c2 + c2', c3 + c3', c4 + c4')
|
||||
pure (subCnt + cnt, cnts', clCnt + if cnt == 0 then 0 else 1)
|
||||
clCnt' = if cnt == 0 then clCnt else clCnt + 1
|
||||
qs' <- if cnt == 0 then pure qs else addQueueLengths qs cl
|
||||
pure (subCnt + cnt, cnts', clCnt', qs')
|
||||
clientTBQueueLengths :: Foldable t => t Client -> IO (Natural, Natural, Natural)
|
||||
clientTBQueueLengths = foldM addQueueLengths (0, 0, 0)
|
||||
addQueueLengths (!rl, !sl, !ml) cl = do
|
||||
(rl', sl', ml') <- queueLengths cl
|
||||
pure (rl + rl', sl + sl', ml + ml')
|
||||
queueLengths Client {rcvQ, sndQ, msgQ} = do
|
||||
rl <- atomically $ lengthTBQueue rcvQ
|
||||
sl <- atomically $ lengthTBQueue sndQ
|
||||
ml <- atomically $ lengthTBQueue msgQ
|
||||
pure (rl, sl, ml)
|
||||
countSMPSubs :: M.Map QueueId Sub -> IO (Int, Int, Int, Int)
|
||||
countSMPSubs = foldM countSubs (0, 0, 0, 0)
|
||||
where
|
||||
|
||||
Reference in New Issue
Block a user