mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-27 20:44:29 +00:00
collect clientstats
This commit is contained in:
@@ -445,14 +445,14 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
|
||||
runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do
|
||||
q <- asks $ tbqSize . config
|
||||
ts <- liftIO getSystemTime
|
||||
statsIds <- asks statsClients
|
||||
active <- asks clients
|
||||
nextClientId <- asks clientSeq
|
||||
let peerId = getPeerId connection
|
||||
skipStats = False -- TODO: check peerId
|
||||
statsIds' <- asks statsClients
|
||||
c <- atomically $ do
|
||||
new@Client {clientId} <- newClient peerId nextClientId q thVersion sessionId ts
|
||||
unless skipStats $ modifyTVar' statsIds $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id
|
||||
unless skipStats $ modifyTVar' statsIds' $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id
|
||||
modifyTVar' active $ IM.insert clientId new
|
||||
pure new
|
||||
s <- asks server
|
||||
@@ -477,6 +477,7 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT
|
||||
atomically $ modifyTVar' srvSubs $ \cs ->
|
||||
M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs
|
||||
asks clients >>= atomically . (`modifyTVar'` IM.delete clientId)
|
||||
asks statsClients >>= atomically . (`modifyTVar'` IM.delete clientId)
|
||||
tIds <- atomically $ swapTVar endThreads IM.empty
|
||||
liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds
|
||||
where
|
||||
@@ -712,7 +713,13 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar' (qCreated stats) (+ 1)
|
||||
atomically $ modifyTVar' (qCount stats) (+ 1)
|
||||
-- TODO: increment client Q counter
|
||||
|
||||
now <- liftIO getCurrentTime
|
||||
statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId)
|
||||
stats' <- asks clientStats -- TVar (IntMap ClientStats)
|
||||
atomically $ withClientStatId statsIds' $ \statsId -> do
|
||||
cs <- getClientStats stats' statsId now
|
||||
modifyTVar' (CS.qCreated cs) $ S.insert rId
|
||||
-- TODO: increment current Q counter in IP timeline
|
||||
-- TODO: increment current Q counter in server timeline
|
||||
case subMode of
|
||||
@@ -867,6 +874,13 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd
|
||||
when (notification msgFlags) $ do
|
||||
atomically $ modifyTVar' (msgRecvNtf stats) (+ 1)
|
||||
atomically $ updatePeriodStats (activeQueuesNtf stats) queueId
|
||||
senders' <- asks sendSignedClients
|
||||
stats' <- asks clientStats
|
||||
atomically $ do
|
||||
sender_ <- mapM readTVar =<< TM.lookup (recipientId qr) senders'
|
||||
forM_ sender_ $ \statsId -> do
|
||||
cs_ <- IM.lookup statsId <$> readTVar stats'
|
||||
forM_ cs_ $ \cs -> modifyTVar' (CS.msgDeliveredSigned cs) (+ 1)
|
||||
|
||||
sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg)
|
||||
sendMessage qr msgFlags msgBody
|
||||
@@ -899,67 +913,37 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd
|
||||
|
||||
logDebug $ "Senders gonna send..."
|
||||
senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId)
|
||||
statIds' <- asks statsClients -- TVar (IntMap ClientStatsId)
|
||||
statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId)
|
||||
stats' <- asks clientStats -- TVar (IntMap ClientStats)
|
||||
now <- liftIO getCurrentTime
|
||||
atomically $ case senderKey qr of
|
||||
Nothing -> withClientStatId statIds' $ \statsId -> do
|
||||
Nothing -> withClientStatId statsIds' $ \statsId -> do
|
||||
-- unsecured queue, no merging
|
||||
cs <- getClientStats stats' statsId now
|
||||
-- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions
|
||||
modifyTVar' (CS.msgSentUnsigned cs) (+ 1)
|
||||
Just _ -> withClientStatId statIds' $ \currentStatsId -> do
|
||||
Just _secured -> withClientStatId statsIds' $ \currentStatsId -> do
|
||||
-- secured queue, merging is possible
|
||||
senders <- readTVar senders'
|
||||
statsId <- case M.lookup (recipientId qr) senders of
|
||||
Nothing -> do -- claim queue ownership (should've happened on NEW instead)
|
||||
unsafeIOToSTM . logNote $ "Needs claiming: " <> tshow (strEncode $ recipientId qr, currentStatsId)
|
||||
Nothing -> do
|
||||
newOwner <- newTVar currentStatsId
|
||||
writeTVar senders' $ M.insert (recipientId qr) newOwner senders
|
||||
pure currentStatsId
|
||||
Just owner -> do
|
||||
prevStatsId <- readTVar owner
|
||||
Just sender -> do
|
||||
prevStatsId <- readTVar sender
|
||||
unless (prevStatsId == currentStatsId) $ do
|
||||
unsafeIOToSTM . logNote $ "Needs merge: " <> tshow (currentStatsId, prevStatsId)
|
||||
modifyTVar' statIds' $ IM.insert clientId prevStatsId
|
||||
qsToUpdate <- mergeClientStats stats' prevStatsId currentStatsId
|
||||
unsafeIOToSTM . logNote $ "Queues to transfer: " <> tshow (currentStatsId, prevStatsId, qsToUpdate)
|
||||
unless (S.null qsToUpdate) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k owner os) senders qsToUpdate
|
||||
modifyTVar' statsIds' $ IM.insert clientId prevStatsId
|
||||
qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId
|
||||
unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer
|
||||
pure prevStatsId
|
||||
cs <- getClientStats stats' statsId now
|
||||
modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr)
|
||||
modifyTVar' (CS.msgSentSigned cs) (+ 1)
|
||||
unsafeIOToSTM . logWarn $ "msgSentSigned +1 for " <> tshow (clientId, currentStatsId, statsId)
|
||||
-- TODO: increment current S counter in IP timeline
|
||||
-- TODO: increment current S counter in server timeline
|
||||
pure ok
|
||||
where
|
||||
-- missing clientId entry means the client is exempt from stats
|
||||
withClientStatId statIds' action = readTVar statIds' >>= mapM_ action . IM.lookup clientId
|
||||
|
||||
getClientStats stats' statsId now = do
|
||||
stats <- readTVar stats'
|
||||
case IM.lookup statsId stats of
|
||||
Nothing -> do
|
||||
new <- CS.newClientStats newTVar peerId now
|
||||
writeTVar stats' $ IM.insert statsId new stats
|
||||
pure new
|
||||
Just cs -> cs <$ writeTVar (CS.updatedAt cs) now
|
||||
|
||||
mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId)
|
||||
mergeClientStats stats' prevId curId = do
|
||||
stats <- readTVar stats'
|
||||
case (IM.lookup prevId stats, IM.lookup curId stats) of
|
||||
(_, Nothing) -> pure mempty
|
||||
(Nothing, Just cur@CS.ClientStats {qCreated}) -> do
|
||||
writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats)
|
||||
readTVar qCreated
|
||||
(Just prev, Just cur) -> do
|
||||
curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur
|
||||
prevData <- CS.readClientStatsData readTVar prev
|
||||
CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData
|
||||
writeTVar stats' $ IM.delete curId stats
|
||||
pure _qCreated
|
||||
|
||||
mkMessage :: C.MaxLenBS MaxMessageLen -> M Message
|
||||
mkMessage body = do
|
||||
msgId <- randomId =<< asks (msgIdBytes . config)
|
||||
@@ -1063,6 +1047,33 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd
|
||||
okResp :: Either ErrorType () -> Transmission BrokerMsg
|
||||
okResp = either err $ const ok
|
||||
|
||||
-- missing clientId entry means the client is exempt from stats
|
||||
withClientStatId statsIds' action = readTVar statsIds' >>= mapM_ action . IM.lookup clientId
|
||||
|
||||
getClientStats stats' statsId now = do
|
||||
stats <- readTVar stats'
|
||||
case IM.lookup statsId stats of
|
||||
Nothing -> do
|
||||
new <- CS.newClientStats newTVar peerId now
|
||||
writeTVar stats' $ IM.insert statsId new stats
|
||||
pure new
|
||||
Just cs -> cs <$ writeTVar (CS.updatedAt cs) now
|
||||
|
||||
mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId)
|
||||
mergeClientStats stats' prevId curId = do
|
||||
stats <- readTVar stats'
|
||||
case (IM.lookup prevId stats, IM.lookup curId stats) of
|
||||
(_, Nothing) -> pure mempty
|
||||
(Nothing, Just cur@CS.ClientStats {qCreated}) -> do
|
||||
writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats)
|
||||
readTVar qCreated
|
||||
(Just prev, Just cur) -> do
|
||||
curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur
|
||||
prevData <- CS.readClientStatsData readTVar prev
|
||||
CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData
|
||||
writeTVar stats' $ IM.delete curId stats
|
||||
pure _qCreated
|
||||
|
||||
updateDeletedStats :: QueueRec -> M ()
|
||||
updateDeletedStats q = do
|
||||
stats <- asks serverStats
|
||||
|
||||
@@ -121,8 +121,8 @@ data Env = Env
|
||||
qCreatedByIp :: Timeline Int,
|
||||
msgSentByIp :: Timeline Int,
|
||||
clientStats :: TVar (IntMap ClientStats), -- transitive session stats
|
||||
statsClients :: TVar (IntMap ClientStatsId), -- reverse index from active clients
|
||||
sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their owners
|
||||
statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets
|
||||
sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders
|
||||
sockets :: SocketState,
|
||||
clientSeq :: TVar Int,
|
||||
clients :: TVar (IntMap Client)
|
||||
|
||||
@@ -8,24 +8,10 @@
|
||||
|
||||
module Simplex.Messaging.Server.Stats.Client where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.IntMap (IntMap)
|
||||
import qualified Data.IntMap.Strict as IM
|
||||
import Data.IntPSQ (IntPSQ)
|
||||
import qualified Data.IntPSQ as IP
|
||||
import Data.IntSet (IntSet)
|
||||
import qualified Data.IntSet as IS
|
||||
import Data.Monoid (getSum)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Time.Calendar.Month (pattern MonthDay)
|
||||
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime (..))
|
||||
import Data.Time.Clock.POSIX (POSIXTime)
|
||||
import Data.Word (Word32)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Data.Time.Clock (UTCTime (..))
|
||||
import Simplex.Messaging.Protocol (RecipientId)
|
||||
import Simplex.Messaging.Transport (PeerId)
|
||||
import UnliftIO.STM
|
||||
@@ -38,11 +24,11 @@ data ClientStats = ClientStats
|
||||
socketCount :: TVar Int,
|
||||
createdAt :: TVar UTCTime,
|
||||
updatedAt :: TVar UTCTime,
|
||||
qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs
|
||||
qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs, for dumping into suspicous
|
||||
qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs
|
||||
msgSentSigned :: TVar Int,
|
||||
msgSentUnsigned :: TVar Int,
|
||||
msgSentViaProxy :: TVar Int,
|
||||
msgSentViaProxy :: TVar Int, -- TODO
|
||||
msgDeliveredSigned :: TVar Int
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user