mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-31 09:46:11 +00:00
additional SMP server stats (#605)
* additional SMP server stats * refactor
This commit is contained in:
committed by
GitHub
parent
f47e7bf3c5
commit
56cc2bc71f
@@ -104,8 +104,8 @@ type M a = ReaderT Env IO a
|
||||
smpServer :: TMVar Bool -> ServerConfig -> M ()
|
||||
smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
|
||||
s <- asks server
|
||||
restoreServerStats
|
||||
restoreServerMessages
|
||||
restoreServerStats
|
||||
raceAny_
|
||||
( serverThread s subscribedQ subscribers subscriptions cancelSub :
|
||||
serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) :
|
||||
@@ -174,7 +174,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} <- asks serverStats
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
hSetBuffering h LineBuffering
|
||||
@@ -187,7 +187,31 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
|
||||
msgSent' <- atomically $ swapTVar msgSent 0
|
||||
msgRecv' <- atomically $ swapTVar msgRecv 0
|
||||
ps <- atomically $ periodStatCounts activeQueues ts
|
||||
hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayCount ps, weekCount ps, monthCount ps]
|
||||
msgSentNtf' <- atomically $ swapTVar msgSentNtf 0
|
||||
msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0
|
||||
psNtf <- atomically $ periodStatCounts activeQueuesNtf ts
|
||||
qCount' <- readTVarIO qCount
|
||||
msgCount' <- readTVarIO msgCount
|
||||
hPutStrLn h $
|
||||
intercalate
|
||||
","
|
||||
[ iso8601Show $ utctDay fromTime',
|
||||
show qCreated',
|
||||
show qSecured',
|
||||
show qDeleted',
|
||||
show msgSent',
|
||||
show msgRecv',
|
||||
dayCount ps,
|
||||
weekCount ps,
|
||||
monthCount ps,
|
||||
show msgSentNtf',
|
||||
show msgRecvNtf',
|
||||
dayCount psNtf,
|
||||
weekCount psNtf,
|
||||
monthCount psNtf,
|
||||
show qCount',
|
||||
show msgCount'
|
||||
]
|
||||
threadDelay interval
|
||||
|
||||
runClient :: Transport c => TProxy c -> c -> M ()
|
||||
@@ -387,6 +411,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
withLog (`logCreateById` rId)
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar' (qCreated stats) (+ 1)
|
||||
atomically $ modifyTVar' (qCount stats) (+ 1)
|
||||
subscribeQueue qr rId $> IDS (qik ids)
|
||||
|
||||
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
|
||||
@@ -509,12 +534,12 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
q <- getStoreMsgQueue "ACK" queueId
|
||||
case s of
|
||||
Sub {subThread = ProhibitSub} -> do
|
||||
msgDeleted <- atomically $ tryDelMsg q msgId
|
||||
when msgDeleted updateStats
|
||||
deletedMsg_ <- atomically $ tryDelMsg q msgId
|
||||
mapM_ updateStats deletedMsg_
|
||||
pure ok
|
||||
_ -> do
|
||||
(msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId
|
||||
when msgDeleted updateStats
|
||||
(deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId
|
||||
mapM_ updateStats deletedMsg_
|
||||
deliverMessage "ACK" qr queueId sub q msg_
|
||||
_ -> pure $ err NO_MSG
|
||||
where
|
||||
@@ -525,11 +550,17 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
if msgId == msgId' || B.null msgId
|
||||
then pure $ Just s
|
||||
else putTMVar delivered msgId' $> Nothing
|
||||
updateStats :: m ()
|
||||
updateStats = do
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar' (msgRecv stats) (+ 1)
|
||||
atomically $ updatePeriodStats (activeQueues stats) queueId
|
||||
updateStats :: Message -> m ()
|
||||
updateStats = \case
|
||||
MessageQuota {} -> pure ()
|
||||
Message {msgFlags} -> do
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar' (msgRecv stats) (+ 1)
|
||||
atomically $ modifyTVar' (msgCount stats) (+ 1)
|
||||
atomically $ updatePeriodStats (activeQueues stats) queueId
|
||||
when (notification msgFlags) $ do
|
||||
atomically $ modifyTVar' (msgRecvNtf stats) (+ 1)
|
||||
atomically $ updatePeriodStats (activeQueuesNtf stats) queueId
|
||||
|
||||
sendMessage :: QueueRec -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg)
|
||||
sendMessage qr msgFlags msgBody
|
||||
@@ -547,10 +578,13 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
case msg_ of
|
||||
Nothing -> pure $ err QUOTA
|
||||
Just msg -> time "SEND ok" $ do
|
||||
when (notification msgFlags) $
|
||||
atomically . trySendNotification msg =<< asks idsDrg
|
||||
stats <- asks serverStats
|
||||
when (notification msgFlags) $ do
|
||||
atomically . trySendNotification msg =<< asks idsDrg
|
||||
atomically $ modifyTVar' (msgSentNtf stats) (+ 1)
|
||||
atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
|
||||
atomically $ modifyTVar' (msgSent stats) (+ 1)
|
||||
atomically $ modifyTVar' (msgCount stats) (subtract 1)
|
||||
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
|
||||
pure ok
|
||||
where
|
||||
@@ -647,6 +681,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
ms <- asks msgStore
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar' (qDeleted stats) (+ 1)
|
||||
atomically $ modifyTVar' (qCount stats) (subtract 1)
|
||||
atomically $
|
||||
deleteQueue st queueId >>= \case
|
||||
Left e -> pure $ err e
|
||||
@@ -755,7 +790,9 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat
|
||||
liftIO (strDecode <$> B.readFile f) >>= \case
|
||||
Right d -> do
|
||||
s <- asks serverStats
|
||||
atomically $ setServerStats s d
|
||||
_qCount <- fmap (length . M.keys) . readTVarIO . queues =<< asks queueStore
|
||||
_msgCount <- foldM (\n q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore
|
||||
atomically $ setServerStats s d {_qCount, _msgCount}
|
||||
renameFile f $ f <> ".bak"
|
||||
logInfo "server stats restored"
|
||||
Left e -> do
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore.STM
|
||||
( STMMsgStore,
|
||||
MsgQueue,
|
||||
MsgQueue (..),
|
||||
newMsgStore,
|
||||
getMsgQueue,
|
||||
delMsgQueue,
|
||||
@@ -86,22 +86,22 @@ peekMsg :: MsgQueue -> STM Message
|
||||
peekMsg = peekTQueue . msgQueue
|
||||
{-# INLINE peekMsg #-}
|
||||
|
||||
tryDelMsg :: MsgQueue -> MsgId -> STM Bool
|
||||
tryDelMsg :: MsgQueue -> MsgId -> STM (Maybe Message)
|
||||
tryDelMsg mq msgId' =
|
||||
tryPeekMsg mq >>= \case
|
||||
Just msg
|
||||
| msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure True
|
||||
| otherwise -> pure False
|
||||
_ -> pure False
|
||||
msg_@(Just msg)
|
||||
| msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure msg_
|
||||
| otherwise -> pure Nothing
|
||||
_ -> pure Nothing
|
||||
|
||||
-- atomic delete (== read) last and peek next message if available
|
||||
tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Bool, Maybe Message)
|
||||
tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Maybe Message, Maybe Message)
|
||||
tryDelPeekMsg mq msgId' =
|
||||
tryPeekMsg mq >>= \case
|
||||
msg_@(Just msg)
|
||||
| msgId msg == msgId' || B.null msgId' -> (True,) <$> (tryDeleteMsg mq >> tryPeekMsg mq)
|
||||
| otherwise -> pure (False, msg_)
|
||||
_ -> pure (False, Nothing)
|
||||
| msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg mq >> tryPeekMsg mq)
|
||||
| otherwise -> pure (Nothing, msg_)
|
||||
_ -> pure (Nothing, Nothing)
|
||||
|
||||
deleteExpiredMsgs :: MsgQueue -> Int64 -> STM ()
|
||||
deleteExpiredMsgs mq old = loop
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
@@ -5,7 +6,7 @@
|
||||
|
||||
module Simplex.Messaging.Server.Stats where
|
||||
|
||||
import Control.Applicative (optional)
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Set (Set)
|
||||
@@ -24,7 +25,12 @@ data ServerStats = ServerStats
|
||||
qDeleted :: TVar Int,
|
||||
msgSent :: TVar Int,
|
||||
msgRecv :: TVar Int,
|
||||
activeQueues :: PeriodStats RecipientId
|
||||
activeQueues :: PeriodStats RecipientId,
|
||||
msgSentNtf :: TVar Int,
|
||||
msgRecvNtf :: TVar Int,
|
||||
activeQueuesNtf :: PeriodStats RecipientId,
|
||||
qCount :: TVar Int,
|
||||
msgCount :: TVar Int
|
||||
}
|
||||
|
||||
data ServerStatsData = ServerStatsData
|
||||
@@ -34,7 +40,12 @@ data ServerStatsData = ServerStatsData
|
||||
_qDeleted :: Int,
|
||||
_msgSent :: Int,
|
||||
_msgRecv :: Int,
|
||||
_activeQueues :: PeriodStatsData RecipientId
|
||||
_activeQueues :: PeriodStatsData RecipientId,
|
||||
_msgSentNtf :: Int,
|
||||
_msgRecvNtf :: Int,
|
||||
_activeQueuesNtf :: PeriodStatsData RecipientId,
|
||||
_qCount :: Int,
|
||||
_msgCount :: Int
|
||||
}
|
||||
|
||||
newServerStats :: UTCTime -> STM ServerStats
|
||||
@@ -46,7 +57,12 @@ newServerStats ts = do
|
||||
msgSent <- newTVar 0
|
||||
msgRecv <- newTVar 0
|
||||
activeQueues <- newPeriodStats
|
||||
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues}
|
||||
msgSentNtf <- newTVar 0
|
||||
msgRecvNtf <- newTVar 0
|
||||
activeQueuesNtf <- newPeriodStats
|
||||
qCount <- newTVar 0
|
||||
msgCount <- newTVar 0
|
||||
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
|
||||
|
||||
getServerStatsData :: ServerStats -> STM ServerStatsData
|
||||
getServerStatsData s = do
|
||||
@@ -57,7 +73,12 @@ getServerStatsData s = do
|
||||
_msgSent <- readTVar $ msgSent s
|
||||
_msgRecv <- readTVar $ msgRecv s
|
||||
_activeQueues <- getPeriodStatsData $ activeQueues s
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues}
|
||||
_msgSentNtf <- readTVar $ msgSentNtf s
|
||||
_msgRecvNtf <- readTVar $ msgRecvNtf s
|
||||
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
|
||||
_qCount <- readTVar $ qCount s
|
||||
_msgCount <- readTVar $ msgCount s
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
|
||||
|
||||
setServerStats :: ServerStats -> ServerStatsData -> STM ()
|
||||
setServerStats s d = do
|
||||
@@ -67,10 +88,15 @@ setServerStats s d = do
|
||||
writeTVar (qDeleted s) $! _qDeleted d
|
||||
writeTVar (msgSent s) $! _msgSent d
|
||||
writeTVar (msgRecv s) $! _msgRecv d
|
||||
setPeriodStats (activeQueues s) (_activeQueues d)
|
||||
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
|
||||
writeTVar (msgSentNtf s) $! _msgSentNtf d
|
||||
writeTVar (msgRecvNtf s) $! _msgRecvNtf d
|
||||
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
|
||||
writeTVar (qCount s) $! _qCount d
|
||||
writeTVar (msgCount s) $! _qCount d
|
||||
|
||||
instance StrEncoding ServerStatsData where
|
||||
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} =
|
||||
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf} =
|
||||
B.unlines
|
||||
[ "fromTime=" <> strEncode _fromTime,
|
||||
"qCreated=" <> strEncode _qCreated,
|
||||
@@ -78,8 +104,12 @@ instance StrEncoding ServerStatsData where
|
||||
"qDeleted=" <> strEncode _qDeleted,
|
||||
"msgSent=" <> strEncode _msgSent,
|
||||
"msgRecv=" <> strEncode _msgRecv,
|
||||
"msgSentNtf=" <> strEncode _msgSentNtf,
|
||||
"msgRecvNtf=" <> strEncode _msgRecvNtf,
|
||||
"activeQueues:",
|
||||
strEncode _activeQueues
|
||||
strEncode _activeQueues,
|
||||
"activeQueuesNtf:",
|
||||
strEncode _activeQueuesNtf
|
||||
]
|
||||
strP = do
|
||||
_fromTime <- "fromTime=" *> strP <* A.endOfLine
|
||||
@@ -88,15 +118,21 @@ instance StrEncoding ServerStatsData where
|
||||
_qDeleted <- "qDeleted=" *> strP <* A.endOfLine
|
||||
_msgSent <- "msgSent=" *> strP <* A.endOfLine
|
||||
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
|
||||
r <- optional ("activeQueues:" <* A.endOfLine)
|
||||
_activeQueues <- case r of
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
_ -> do
|
||||
_day <- "dayMsgQueues=" *> strP <* A.endOfLine
|
||||
_week <- "weekMsgQueues=" *> strP <* A.endOfLine
|
||||
_month <- "monthMsgQueues=" *> strP <* optional A.endOfLine
|
||||
pure PeriodStatsData {_day, _week, _month}
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues}
|
||||
_msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0
|
||||
_msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0
|
||||
_activeQueues <-
|
||||
optional ("activeQueues:" <* A.endOfLine) >>= \case
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
_ -> do
|
||||
_day <- "dayMsgQueues=" *> strP <* A.endOfLine
|
||||
_week <- "weekMsgQueues=" *> strP <* A.endOfLine
|
||||
_month <- "monthMsgQueues=" *> strP <* optional A.endOfLine
|
||||
pure PeriodStatsData {_day, _week, _month}
|
||||
_activeQueuesNtf <-
|
||||
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
_ -> pure newPeriodStatsData
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount = 0, _msgCount = 0}
|
||||
|
||||
data PeriodStats a = PeriodStats
|
||||
{ day :: TVar (Set a),
|
||||
@@ -117,6 +153,9 @@ data PeriodStatsData a = PeriodStatsData
|
||||
_month :: Set a
|
||||
}
|
||||
|
||||
newPeriodStatsData :: PeriodStatsData a
|
||||
newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty}
|
||||
|
||||
getPeriodStatsData :: PeriodStats a -> STM (PeriodStatsData a)
|
||||
getPeriodStatsData s = do
|
||||
_day <- readTVar $ day s
|
||||
|
||||
Reference in New Issue
Block a user