From 56cc2bc71f3afd54e49892fcfbcb56e910f5b66c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sat, 14 Jan 2023 13:23:37 +0000 Subject: [PATCH] additional SMP server stats (#605) * additional SMP server stats * refactor --- src/Simplex/Messaging/Server.hs | 67 ++++++++++++++---- src/Simplex/Messaging/Server/MsgStore/STM.hs | 20 +++--- src/Simplex/Messaging/Server/Stats.hs | 73 +++++++++++++++----- 3 files changed, 118 insertions(+), 42 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 2ae1bfdb5..e5cb7f919 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -104,8 +104,8 @@ type M a = ReaderT Env IO a smpServer :: TMVar Bool -> ServerConfig -> M () smpServer started cfg@ServerConfig {transports, logTLSErrors} = do s <- asks server - restoreServerStats restoreServerMessages + restoreServerStats raceAny_ ( serverThread s subscribedQ subscribers subscriptions cancelSub : serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) : @@ -174,7 +174,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats let interval = 1000000 * logInterval withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering @@ -187,7 +187,31 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do msgSent' <- atomically $ swapTVar msgSent 0 msgRecv' <- atomically $ swapTVar msgRecv 0 ps <- atomically $ periodStatCounts activeQueues ts - hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayCount ps, weekCount ps, monthCount ps] + msgSentNtf' <- atomically $ swapTVar msgSentNtf 0 + msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0 + psNtf <- atomically $ periodStatCounts activeQueuesNtf ts + qCount' <- readTVarIO qCount + msgCount' <- readTVarIO msgCount + hPutStrLn h $ + intercalate + "," + [ iso8601Show $ utctDay fromTime', + show qCreated', + show qSecured', + show qDeleted', + show msgSent', + show msgRecv', + dayCount ps, + weekCount ps, + monthCount ps, + show msgSentNtf', + show msgRecvNtf', + dayCount psNtf, + weekCount psNtf, + monthCount psNtf, + show qCount', + show msgCount' + ] threadDelay interval runClient :: Transport c => TProxy c -> c -> M () @@ -387,6 +411,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv withLog (`logCreateById` rId) stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) + atomically $ modifyTVar' (qCount stats) (+ 1) subscribeQueue qr rId $> IDS (qik ids) logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO () @@ -509,12 +534,12 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv q <- getStoreMsgQueue "ACK" queueId case s of Sub {subThread = ProhibitSub} -> do - msgDeleted <- atomically $ tryDelMsg q msgId - when msgDeleted updateStats + deletedMsg_ <- atomically $ tryDelMsg q msgId + mapM_ updateStats deletedMsg_ pure ok _ -> do - (msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId - when msgDeleted updateStats + (deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId + mapM_ updateStats deletedMsg_ deliverMessage "ACK" qr queueId sub q msg_ _ -> pure $ err NO_MSG where @@ -525,11 +550,17 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv if msgId == msgId' || B.null msgId then pure $ Just s else putTMVar delivered msgId' $> Nothing - updateStats :: m () - updateStats = do - stats <- asks serverStats - atomically $ modifyTVar' (msgRecv stats) (+ 1) - atomically $ updatePeriodStats (activeQueues stats) queueId + updateStats :: Message -> m () + updateStats = \case + MessageQuota {} -> pure () + Message {msgFlags} -> do + stats <- asks serverStats + atomically $ modifyTVar' (msgRecv stats) (+ 1) + atomically $ modifyTVar' (msgCount stats) (+ 1) + atomically $ updatePeriodStats (activeQueues stats) queueId + when (notification msgFlags) $ do + atomically $ modifyTVar' (msgRecvNtf stats) (+ 1) + atomically $ updatePeriodStats (activeQueuesNtf stats) queueId sendMessage :: QueueRec -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg) sendMessage qr msgFlags msgBody @@ -547,10 +578,13 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv case msg_ of Nothing -> pure $ err QUOTA Just msg -> time "SEND ok" $ do - when (notification msgFlags) $ - atomically . trySendNotification msg =<< asks idsDrg stats <- asks serverStats + when (notification msgFlags) $ do + atomically . trySendNotification msg =<< asks idsDrg + atomically $ modifyTVar' (msgSentNtf stats) (+ 1) + atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) atomically $ modifyTVar' (msgSent stats) (+ 1) + atomically $ modifyTVar' (msgCount stats) (subtract 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) pure ok where @@ -647,6 +681,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv ms <- asks msgStore stats <- asks serverStats atomically $ modifyTVar' (qDeleted stats) (+ 1) + atomically $ modifyTVar' (qCount stats) (subtract 1) atomically $ deleteQueue st queueId >>= \case Left e -> pure $ err e @@ -755,7 +790,9 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat liftIO (strDecode <$> B.readFile f) >>= \case Right d -> do s <- asks serverStats - atomically $ setServerStats s d + _qCount <- fmap (length . M.keys) . readTVarIO . queues =<< asks queueStore + _msgCount <- foldM (\n q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore + atomically $ setServerStats s d {_qCount, _msgCount} renameFile f $ f <> ".bak" logInfo "server stats restored" Left e -> do diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 4ecf6d152..e9dd95eec 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -8,7 +8,7 @@ module Simplex.Messaging.Server.MsgStore.STM ( STMMsgStore, - MsgQueue, + MsgQueue (..), newMsgStore, getMsgQueue, delMsgQueue, @@ -86,22 +86,22 @@ peekMsg :: MsgQueue -> STM Message peekMsg = peekTQueue . msgQueue {-# INLINE peekMsg #-} -tryDelMsg :: MsgQueue -> MsgId -> STM Bool +tryDelMsg :: MsgQueue -> MsgId -> STM (Maybe Message) tryDelMsg mq msgId' = tryPeekMsg mq >>= \case - Just msg - | msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure True - | otherwise -> pure False - _ -> pure False + msg_@(Just msg) + | msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure msg_ + | otherwise -> pure Nothing + _ -> pure Nothing -- atomic delete (== read) last and peek next message if available -tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Bool, Maybe Message) +tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Maybe Message, Maybe Message) tryDelPeekMsg mq msgId' = tryPeekMsg mq >>= \case msg_@(Just msg) - | msgId msg == msgId' || B.null msgId' -> (True,) <$> (tryDeleteMsg mq >> tryPeekMsg mq) - | otherwise -> pure (False, msg_) - _ -> pure (False, Nothing) + | msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg mq >> tryPeekMsg mq) + | otherwise -> pure (Nothing, msg_) + _ -> pure (Nothing, Nothing) deleteExpiredMsgs :: MsgQueue -> Int64 -> STM () deleteExpiredMsgs mq old = loop diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 3f08d5cb8..82170e90f 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} @@ -5,7 +6,7 @@ module Simplex.Messaging.Server.Stats where -import Control.Applicative (optional) +import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B import Data.Set (Set) @@ -24,7 +25,12 @@ data ServerStats = ServerStats qDeleted :: TVar Int, msgSent :: TVar Int, msgRecv :: TVar Int, - activeQueues :: PeriodStats RecipientId + activeQueues :: PeriodStats RecipientId, + msgSentNtf :: TVar Int, + msgRecvNtf :: TVar Int, + activeQueuesNtf :: PeriodStats RecipientId, + qCount :: TVar Int, + msgCount :: TVar Int } data ServerStatsData = ServerStatsData @@ -34,7 +40,12 @@ data ServerStatsData = ServerStatsData _qDeleted :: Int, _msgSent :: Int, _msgRecv :: Int, - _activeQueues :: PeriodStatsData RecipientId + _activeQueues :: PeriodStatsData RecipientId, + _msgSentNtf :: Int, + _msgRecvNtf :: Int, + _activeQueuesNtf :: PeriodStatsData RecipientId, + _qCount :: Int, + _msgCount :: Int } newServerStats :: UTCTime -> STM ServerStats @@ -46,7 +57,12 @@ newServerStats ts = do msgSent <- newTVar 0 msgRecv <- newTVar 0 activeQueues <- newPeriodStats - pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} + msgSentNtf <- newTVar 0 + msgRecvNtf <- newTVar 0 + activeQueuesNtf <- newPeriodStats + qCount <- newTVar 0 + msgCount <- newTVar 0 + pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -57,7 +73,12 @@ getServerStatsData s = do _msgSent <- readTVar $ msgSent s _msgRecv <- readTVar $ msgRecv s _activeQueues <- getPeriodStatsData $ activeQueues s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} + _msgSentNtf <- readTVar $ msgSentNtf s + _msgRecvNtf <- readTVar $ msgRecvNtf s + _activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s + _qCount <- readTVar $ qCount s + _msgCount <- readTVar $ msgCount s + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do @@ -67,10 +88,15 @@ setServerStats s d = do writeTVar (qDeleted s) $! _qDeleted d writeTVar (msgSent s) $! _msgSent d writeTVar (msgRecv s) $! _msgRecv d - setPeriodStats (activeQueues s) (_activeQueues d) + setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + writeTVar (msgSentNtf s) $! _msgSentNtf d + writeTVar (msgRecvNtf s) $! _msgRecvNtf d + setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + writeTVar (qCount s) $! _qCount d + writeTVar (msgCount s) $! _qCount d instance StrEncoding ServerStatsData where - strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} = + strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf} = B.unlines [ "fromTime=" <> strEncode _fromTime, "qCreated=" <> strEncode _qCreated, @@ -78,8 +104,12 @@ instance StrEncoding ServerStatsData where "qDeleted=" <> strEncode _qDeleted, "msgSent=" <> strEncode _msgSent, "msgRecv=" <> strEncode _msgRecv, + "msgSentNtf=" <> strEncode _msgSentNtf, + "msgRecvNtf=" <> strEncode _msgRecvNtf, "activeQueues:", - strEncode _activeQueues + strEncode _activeQueues, + "activeQueuesNtf:", + strEncode _activeQueuesNtf ] strP = do _fromTime <- "fromTime=" *> strP <* A.endOfLine @@ -88,15 +118,21 @@ instance StrEncoding ServerStatsData where _qDeleted <- "qDeleted=" *> strP <* A.endOfLine _msgSent <- "msgSent=" *> strP <* A.endOfLine _msgRecv <- "msgRecv=" *> strP <* A.endOfLine - r <- optional ("activeQueues:" <* A.endOfLine) - _activeQueues <- case r of - Just _ -> strP <* optional A.endOfLine - _ -> do - _day <- "dayMsgQueues=" *> strP <* A.endOfLine - _week <- "weekMsgQueues=" *> strP <* A.endOfLine - _month <- "monthMsgQueues=" *> strP <* optional A.endOfLine - pure PeriodStatsData {_day, _week, _month} - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} + _msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0 + _msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0 + _activeQueues <- + optional ("activeQueues:" <* A.endOfLine) >>= \case + Just _ -> strP <* optional A.endOfLine + _ -> do + _day <- "dayMsgQueues=" *> strP <* A.endOfLine + _week <- "weekMsgQueues=" *> strP <* A.endOfLine + _month <- "monthMsgQueues=" *> strP <* optional A.endOfLine + pure PeriodStatsData {_day, _week, _month} + _activeQueuesNtf <- + optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case + Just _ -> strP <* optional A.endOfLine + _ -> pure newPeriodStatsData + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount = 0, _msgCount = 0} data PeriodStats a = PeriodStats { day :: TVar (Set a), @@ -117,6 +153,9 @@ data PeriodStatsData a = PeriodStatsData _month :: Set a } +newPeriodStatsData :: PeriodStatsData a +newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty} + getPeriodStatsData :: PeriodStats a -> STM (PeriodStatsData a) getPeriodStatsData s = do _day <- readTVar $ day s