From ed88441cbc8f099db6a3b1cef0cc91635fff09e2 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Wed, 15 May 2024 14:12:46 +0300 Subject: [PATCH] collect clientstats --- src/Simplex/Messaging/Server.hs | 97 +++++++++++--------- src/Simplex/Messaging/Server/Env/STM.hs | 4 +- src/Simplex/Messaging/Server/Stats/Client.hs | 20 +--- 3 files changed, 59 insertions(+), 62 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 50cc14991..63f6e2a3a 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -445,14 +445,14 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M () runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime - statsIds <- asks statsClients active <- asks clients nextClientId <- asks clientSeq let peerId = getPeerId connection skipStats = False -- TODO: check peerId + statsIds' <- asks statsClients c <- atomically $ do new@Client {clientId} <- newClient peerId nextClientId q thVersion sessionId ts - unless skipStats $ modifyTVar' statsIds $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id + unless skipStats $ modifyTVar' statsIds' $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id modifyTVar' active $ IM.insert clientId new pure new s <- asks server @@ -477,6 +477,7 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT atomically $ modifyTVar' srvSubs $ \cs -> M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) + asks statsClients >>= atomically . (`modifyTVar'` IM.delete clientId) tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where @@ -712,7 +713,13 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) atomically $ modifyTVar' (qCount stats) (+ 1) - -- TODO: increment client Q counter + + now <- liftIO getCurrentTime + statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) + stats' <- asks clientStats -- TVar (IntMap ClientStats) + atomically $ withClientStatId statsIds' $ \statsId -> do + cs <- getClientStats stats' statsId now + modifyTVar' (CS.qCreated cs) $ S.insert rId -- TODO: increment current Q counter in IP timeline -- TODO: increment current Q counter in server timeline case subMode of @@ -867,6 +874,13 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd when (notification msgFlags) $ do atomically $ modifyTVar' (msgRecvNtf stats) (+ 1) atomically $ updatePeriodStats (activeQueuesNtf stats) queueId + senders' <- asks sendSignedClients + stats' <- asks clientStats + atomically $ do + sender_ <- mapM readTVar =<< TM.lookup (recipientId qr) senders' + forM_ sender_ $ \statsId -> do + cs_ <- IM.lookup statsId <$> readTVar stats' + forM_ cs_ $ \cs -> modifyTVar' (CS.msgDeliveredSigned cs) (+ 1) sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg) sendMessage qr msgFlags msgBody @@ -899,67 +913,37 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd logDebug $ "Senders gonna send..." senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) - statIds' <- asks statsClients -- TVar (IntMap ClientStatsId) + statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) stats' <- asks clientStats -- TVar (IntMap ClientStats) now <- liftIO getCurrentTime atomically $ case senderKey qr of - Nothing -> withClientStatId statIds' $ \statsId -> do + Nothing -> withClientStatId statsIds' $ \statsId -> do -- unsecured queue, no merging cs <- getClientStats stats' statsId now -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions modifyTVar' (CS.msgSentUnsigned cs) (+ 1) - Just _ -> withClientStatId statIds' $ \currentStatsId -> do + Just _secured -> withClientStatId statsIds' $ \currentStatsId -> do -- secured queue, merging is possible senders <- readTVar senders' statsId <- case M.lookup (recipientId qr) senders of - Nothing -> do -- claim queue ownership (should've happened on NEW instead) - unsafeIOToSTM . logNote $ "Needs claiming: " <> tshow (strEncode $ recipientId qr, currentStatsId) + Nothing -> do newOwner <- newTVar currentStatsId writeTVar senders' $ M.insert (recipientId qr) newOwner senders pure currentStatsId - Just owner -> do - prevStatsId <- readTVar owner + Just sender -> do + prevStatsId <- readTVar sender unless (prevStatsId == currentStatsId) $ do - unsafeIOToSTM . logNote $ "Needs merge: " <> tshow (currentStatsId, prevStatsId) - modifyTVar' statIds' $ IM.insert clientId prevStatsId - qsToUpdate <- mergeClientStats stats' prevStatsId currentStatsId - unsafeIOToSTM . logNote $ "Queues to transfer: " <> tshow (currentStatsId, prevStatsId, qsToUpdate) - unless (S.null qsToUpdate) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k owner os) senders qsToUpdate + modifyTVar' statsIds' $ IM.insert clientId prevStatsId + qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId + unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer pure prevStatsId cs <- getClientStats stats' statsId now + modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr) modifyTVar' (CS.msgSentSigned cs) (+ 1) - unsafeIOToSTM . logWarn $ "msgSentSigned +1 for " <> tshow (clientId, currentStatsId, statsId) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok where - -- missing clientId entry means the client is exempt from stats - withClientStatId statIds' action = readTVar statIds' >>= mapM_ action . IM.lookup clientId - - getClientStats stats' statsId now = do - stats <- readTVar stats' - case IM.lookup statsId stats of - Nothing -> do - new <- CS.newClientStats newTVar peerId now - writeTVar stats' $ IM.insert statsId new stats - pure new - Just cs -> cs <$ writeTVar (CS.updatedAt cs) now - - mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) - mergeClientStats stats' prevId curId = do - stats <- readTVar stats' - case (IM.lookup prevId stats, IM.lookup curId stats) of - (_, Nothing) -> pure mempty - (Nothing, Just cur@CS.ClientStats {qCreated}) -> do - writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) - readTVar qCreated - (Just prev, Just cur) -> do - curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur - prevData <- CS.readClientStatsData readTVar prev - CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData - writeTVar stats' $ IM.delete curId stats - pure _qCreated - mkMessage :: C.MaxLenBS MaxMessageLen -> M Message mkMessage body = do msgId <- randomId =<< asks (msgIdBytes . config) @@ -1063,6 +1047,33 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd okResp :: Either ErrorType () -> Transmission BrokerMsg okResp = either err $ const ok + -- missing clientId entry means the client is exempt from stats + withClientStatId statsIds' action = readTVar statsIds' >>= mapM_ action . IM.lookup clientId + + getClientStats stats' statsId now = do + stats <- readTVar stats' + case IM.lookup statsId stats of + Nothing -> do + new <- CS.newClientStats newTVar peerId now + writeTVar stats' $ IM.insert statsId new stats + pure new + Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + + mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) + mergeClientStats stats' prevId curId = do + stats <- readTVar stats' + case (IM.lookup prevId stats, IM.lookup curId stats) of + (_, Nothing) -> pure mempty + (Nothing, Just cur@CS.ClientStats {qCreated}) -> do + writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) + readTVar qCreated + (Just prev, Just cur) -> do + curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur + prevData <- CS.readClientStatsData readTVar prev + CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData + writeTVar stats' $ IM.delete curId stats + pure _qCreated + updateDeletedStats :: QueueRec -> M () updateDeletedStats q = do stats <- asks serverStats diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 28792a0b1..3e4ab2c29 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -121,8 +121,8 @@ data Env = Env qCreatedByIp :: Timeline Int, msgSentByIp :: Timeline Int, clientStats :: TVar (IntMap ClientStats), -- transitive session stats - statsClients :: TVar (IntMap ClientStatsId), -- reverse index from active clients - sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their owners + statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets + sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders sockets :: SocketState, clientSeq :: TVar Int, clients :: TVar (IntMap Client) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 717b30e63..285e2b255 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -8,24 +8,10 @@ module Simplex.Messaging.Server.Stats.Client where -import Control.Applicative (optional, (<|>)) -import qualified Data.Attoparsec.ByteString.Char8 as A -import qualified Data.ByteString.Char8 as B -import Data.IntMap (IntMap) -import qualified Data.IntMap.Strict as IM -import Data.IntPSQ (IntPSQ) -import qualified Data.IntPSQ as IP import Data.IntSet (IntSet) import qualified Data.IntSet as IS -import Data.Monoid (getSum) import Data.Set (Set) -import qualified Data.Set as S -import Data.Time.Calendar.Month (pattern MonthDay) -import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (NominalDiffTime, UTCTime (..)) -import Data.Time.Clock.POSIX (POSIXTime) -import Data.Word (Word32) -import Simplex.Messaging.Encoding.String +import Data.Time.Clock (UTCTime (..)) import Simplex.Messaging.Protocol (RecipientId) import Simplex.Messaging.Transport (PeerId) import UnliftIO.STM @@ -38,11 +24,11 @@ data ClientStats = ClientStats socketCount :: TVar Int, createdAt :: TVar UTCTime, updatedAt :: TVar UTCTime, - qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs + qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs, for dumping into suspicous qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs msgSentSigned :: TVar Int, msgSentUnsigned :: TVar Int, - msgSentViaProxy :: TVar Int, + msgSentViaProxy :: TVar Int, -- TODO msgDeliveredSigned :: TVar Int }