From e639a85bcc7d174ec5e654c01a87fa1d97d3cdc8 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Tue, 14 May 2024 21:49:27 +0300 Subject: [PATCH] stats merging on sendSigned --- src/Simplex/Messaging/Server.hs | 71 +++++++++++++------- src/Simplex/Messaging/Server/Stats/Client.hs | 28 ++++++++ 2 files changed, 76 insertions(+), 23 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index bda4a4572..50cc14991 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -38,6 +38,7 @@ module Simplex.Messaging.Server ) where +import GHC.Conc (unsafeIOToSTM) import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -52,11 +53,14 @@ import qualified Data.ByteString.Lazy.Char8 as LB import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) +import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IM import Data.List (intercalate, mapAccumR) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M +import Data.Set (Set) +import qualified Data.Set as S import Data.Maybe (isNothing) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) @@ -893,39 +897,45 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) - onwers' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) + logDebug $ "Senders gonna send..." + senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) statIds' <- asks statsClients -- TVar (IntMap ClientStatsId) stats' <- asks clientStats -- TVar (IntMap ClientStats) now <- liftIO getCurrentTime atomically $ case senderKey qr of - Nothing -> do + Nothing -> withClientStatId statIds' $ \statsId -> do -- unsecured queue, no merging - currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' - forM_ currentStatsId_ $ \statsId -> do - cs <- getClientStats stats' statsId now - -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions - modifyTVar' (CS.msgSentUnsigned cs) (+ 1) - Just _ -> do + cs <- getClientStats stats' statsId now + -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions + modifyTVar' (CS.msgSentUnsigned cs) (+ 1) + Just _ -> withClientStatId statIds' $ \currentStatsId -> do -- secured queue, merging is possible - currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' - forM_ currentStatsId_ $ \currentStatsId -> do - owners <- readTVar onwers' - statsId <- forM (M.lookup (recipientId qr) owners) readTVar >>= \case - Just ownerId | ownerId == currentStatsId -> pure ownerId -- keep going - Just olderSessionId -> do - -- TODO: merge client stats - pure olderSessionId - -- olderSessionId <$ mergeClientStats owners olderSessionId currentStatsId - Nothing -> do -- claim queue ownership (should've happened on NEW instead) - newOwner <- newTVar currentStatsId - writeTVar onwers' $ M.insert (recipientId qr) newOwner owners - pure currentStatsId - cs <- getClientStats stats' statsId now - modifyTVar' (CS.msgSentSigned cs) (+ 1) + senders <- readTVar senders' + statsId <- case M.lookup (recipientId qr) senders of + Nothing -> do -- claim queue ownership (should've happened on NEW instead) + unsafeIOToSTM . logNote $ "Needs claiming: " <> tshow (strEncode $ recipientId qr, currentStatsId) + newOwner <- newTVar currentStatsId + writeTVar senders' $ M.insert (recipientId qr) newOwner senders + pure currentStatsId + Just owner -> do + prevStatsId <- readTVar owner + unless (prevStatsId == currentStatsId) $ do + unsafeIOToSTM . logNote $ "Needs merge: " <> tshow (currentStatsId, prevStatsId) + modifyTVar' statIds' $ IM.insert clientId prevStatsId + qsToUpdate <- mergeClientStats stats' prevStatsId currentStatsId + unsafeIOToSTM . logNote $ "Queues to transfer: " <> tshow (currentStatsId, prevStatsId, qsToUpdate) + unless (S.null qsToUpdate) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k owner os) senders qsToUpdate + pure prevStatsId + cs <- getClientStats stats' statsId now + modifyTVar' (CS.msgSentSigned cs) (+ 1) + unsafeIOToSTM . logWarn $ "msgSentSigned +1 for " <> tshow (clientId, currentStatsId, statsId) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok where + -- missing clientId entry means the client is exempt from stats + withClientStatId statIds' action = readTVar statIds' >>= mapM_ action . IM.lookup clientId + getClientStats stats' statsId now = do stats <- readTVar stats' case IM.lookup statsId stats of @@ -935,6 +945,21 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd pure new Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) + mergeClientStats stats' prevId curId = do + stats <- readTVar stats' + case (IM.lookup prevId stats, IM.lookup curId stats) of + (_, Nothing) -> pure mempty + (Nothing, Just cur@CS.ClientStats {qCreated}) -> do + writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) + readTVar qCreated + (Just prev, Just cur) -> do + curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur + prevData <- CS.readClientStatsData readTVar prev + CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData + writeTVar stats' $ IM.delete curId stats + pure _qCreated + mkMessage :: C.MaxLenBS MaxMessageLen -> M Message mkMessage body = do msgId <- randomId =<< asks (msgIdBytes . config) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index d5badcaca..717b30e63 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -113,3 +113,31 @@ readClientStatsData readF cs = do _msgDeliveredSigned } {-# INLINE readClientStatsData #-} + +writeClientStatsData :: ClientStats -> ClientStatsData -> STM () +writeClientStatsData cs csd = do + writeTVar (peerAddresses cs) (_peerAddresses csd) + writeTVar (socketCount cs) (_socketCount csd) + writeTVar (createdAt cs) (_createdAt csd) + writeTVar (updatedAt cs) (_updatedAt csd) + writeTVar (qCreated cs) (_qCreated csd) + writeTVar (qSentSigned cs) (_qSentSigned csd) + writeTVar (msgSentSigned cs) (_msgSentSigned csd) + writeTVar (msgSentUnsigned cs) (_msgSentUnsigned csd) + writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd) + writeTVar (msgDeliveredSigned cs) (_msgDeliveredSigned csd) + +mergeClientStatsData :: ClientStatsData -> ClientStatsData -> ClientStatsData +mergeClientStatsData a b = + ClientStatsData + { _peerAddresses = _peerAddresses a <> _peerAddresses b, + _socketCount = _socketCount a + _socketCount b, + _createdAt = min (_createdAt a) (_createdAt b), + _updatedAt = max (_updatedAt a) (_updatedAt b), + _qCreated = _qCreated a <> _qCreated b, + _qSentSigned = _qSentSigned a <> _qSentSigned b, + _msgSentSigned = _msgSentSigned a + _msgSentSigned b, + _msgSentUnsigned = _msgSentUnsigned a + _msgSentUnsigned b, + _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b, + _msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b + }