mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-07 11:02:05 +00:00
smp server: add stats for queues deleted before and after they were secured (#1002)
This commit is contained in:
committed by
GitHub
parent
155831ae36
commit
9b38f69e7e
@@ -204,7 +204,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
forever $ do
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
@@ -213,7 +213,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
fromTime' <- atomically $ swapTVar fromTime ts
|
||||
qCreated' <- atomically $ swapTVar qCreated 0
|
||||
qSecured' <- atomically $ swapTVar qSecured 0
|
||||
qDeleted' <- atomically $ swapTVar qDeleted 0
|
||||
qDeletedAll' <- atomically $ swapTVar qDeletedAll 0
|
||||
qDeletedNew' <- atomically $ swapTVar qDeletedNew 0
|
||||
qDeletedSecured' <- atomically $ swapTVar qDeletedSecured 0
|
||||
msgSent' <- atomically $ swapTVar msgSent 0
|
||||
msgRecv' <- atomically $ swapTVar msgRecv 0
|
||||
msgExpired' <- atomically $ swapTVar msgExpired 0
|
||||
@@ -229,7 +231,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
[ iso8601Show $ utctDay fromTime',
|
||||
show qCreated',
|
||||
show qSecured',
|
||||
show qDeleted',
|
||||
show qDeletedAll',
|
||||
show msgSent',
|
||||
show msgRecv',
|
||||
dayCount ps,
|
||||
@@ -242,7 +244,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
monthCount psNtf,
|
||||
show qCount',
|
||||
show msgCount',
|
||||
show msgExpired'
|
||||
show msgExpired',
|
||||
show qDeletedNew',
|
||||
show qDeletedSecured'
|
||||
]
|
||||
liftIO $ threadDelay' interval
|
||||
|
||||
@@ -299,11 +303,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
subscriptions' <- bshow . M.size <$> readTVarIO subscriptions
|
||||
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
|
||||
CPStats -> do
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats
|
||||
putStat "fromTime" fromTime
|
||||
putStat "qCreated" qCreated
|
||||
putStat "qSecured" qSecured
|
||||
putStat "qDeleted" qDeleted
|
||||
putStat "qDeletedAll" qDeletedAll
|
||||
putStat "qDeletedNew" qDeletedNew
|
||||
putStat "qDeletedSecured" qDeletedSecured
|
||||
putStat "msgSent" msgSent
|
||||
putStat "msgRecv" msgRecv
|
||||
putStat "msgSentNtf" msgSentNtf
|
||||
@@ -350,19 +356,17 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
CPDelete queueId' -> unliftIO u $ do
|
||||
st <- asks queueStore
|
||||
ms <- asks msgStore
|
||||
stats <- asks serverStats
|
||||
queueId <- atomically (getQueue st SSender queueId') >>= \case
|
||||
Left _ -> pure queueId' -- fallback to using as recipientId directly
|
||||
Right QueueRec {recipientId} -> pure recipientId
|
||||
r <- atomically $
|
||||
deleteQueue st queueId $>>= \() ->
|
||||
Right <$> delMsgQueueSize ms queueId
|
||||
deleteQueue st queueId $>>= \q ->
|
||||
Right . (q,) <$> delMsgQueueSize ms queueId
|
||||
case r of
|
||||
Left e -> liftIO . hPutStrLn h $ "error: " <> show e
|
||||
Right numDeleted -> do
|
||||
Right (q, numDeleted) -> do
|
||||
withLog (`logDeleteQueue` queueId)
|
||||
atomically $ modifyTVar' (qDeleted stats) (+ 1)
|
||||
atomically $ modifyTVar' (qCount stats) (subtract 1)
|
||||
updateDeletedStats q
|
||||
liftIO . hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted"
|
||||
CPSave -> withLock (savingLock srv) "control" $ do
|
||||
hPutStrLn h "saving server state..."
|
||||
@@ -878,13 +882,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
|
||||
delQueueAndMsgs st = do
|
||||
withLog (`logDeleteQueue` queueId)
|
||||
ms <- asks msgStore
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar' (qDeleted stats) (+ 1)
|
||||
atomically $ modifyTVar' (qCount stats) (subtract 1)
|
||||
atomically $
|
||||
deleteQueue st queueId >>= \case
|
||||
Left e -> pure $ err e
|
||||
Right _ -> delMsgQueue ms queueId $> ok
|
||||
atomically (deleteQueue st queueId $>>= \q -> delMsgQueue ms queueId $> Right q) >>= \case
|
||||
Right q -> updateDeletedStats q $> ok
|
||||
Left e -> pure $ err e
|
||||
|
||||
ok :: Transmission BrokerMsg
|
||||
ok = (corrId, queueId, OK)
|
||||
@@ -895,6 +895,14 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
|
||||
okResp :: Either ErrorType () -> Transmission BrokerMsg
|
||||
okResp = either err $ const ok
|
||||
|
||||
updateDeletedStats :: (MonadUnliftIO m, MonadReader Env m) => QueueRec -> m ()
|
||||
updateDeletedStats q = do
|
||||
stats <- asks serverStats
|
||||
let delSel = if isNothing (senderKey q) then qDeletedNew else qDeletedSecured
|
||||
atomically $ modifyTVar' (delSel stats) (+ 1)
|
||||
atomically $ modifyTVar' (qDeletedAll stats) (+ 1)
|
||||
atomically $ modifyTVar' (qCount stats) (subtract 1)
|
||||
|
||||
withLog :: (MonadUnliftIO m, MonadReader Env m) => (StoreLog 'WriteMode -> IO a) -> m ()
|
||||
withLog action = do
|
||||
env <- ask
|
||||
|
||||
@@ -94,14 +94,14 @@ suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
|
||||
suspendQueue QueueStore {queues} rId =
|
||||
withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just ()
|
||||
|
||||
deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
|
||||
deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType QueueRec)
|
||||
deleteQueue QueueStore {queues, senders, notifiers} rId = do
|
||||
TM.lookupDelete rId queues >>= \case
|
||||
Just qVar ->
|
||||
readTVar qVar >>= \q -> do
|
||||
TM.delete (senderId q) senders
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers
|
||||
pure $ Right ()
|
||||
pure $ Right q
|
||||
_ -> pure $ Left AUTH
|
||||
|
||||
toResult :: Maybe a -> Either ErrorType a
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Server.Stats where
|
||||
|
||||
@@ -22,7 +23,9 @@ data ServerStats = ServerStats
|
||||
{ fromTime :: TVar UTCTime,
|
||||
qCreated :: TVar Int,
|
||||
qSecured :: TVar Int,
|
||||
qDeleted :: TVar Int,
|
||||
qDeletedAll :: TVar Int,
|
||||
qDeletedNew :: TVar Int,
|
||||
qDeletedSecured :: TVar Int,
|
||||
msgSent :: TVar Int,
|
||||
msgRecv :: TVar Int,
|
||||
msgExpired :: TVar Int,
|
||||
@@ -38,7 +41,9 @@ data ServerStatsData = ServerStatsData
|
||||
{ _fromTime :: UTCTime,
|
||||
_qCreated :: Int,
|
||||
_qSecured :: Int,
|
||||
_qDeleted :: Int,
|
||||
_qDeletedAll :: Int,
|
||||
_qDeletedNew :: Int,
|
||||
_qDeletedSecured :: Int,
|
||||
_msgSent :: Int,
|
||||
_msgRecv :: Int,
|
||||
_msgExpired :: Int,
|
||||
@@ -56,7 +61,9 @@ newServerStats ts = do
|
||||
fromTime <- newTVar ts
|
||||
qCreated <- newTVar 0
|
||||
qSecured <- newTVar 0
|
||||
qDeleted <- newTVar 0
|
||||
qDeletedAll <- newTVar 0
|
||||
qDeletedNew <- newTVar 0
|
||||
qDeletedSecured <- newTVar 0
|
||||
msgSent <- newTVar 0
|
||||
msgRecv <- newTVar 0
|
||||
msgExpired <- newTVar 0
|
||||
@@ -66,14 +73,16 @@ newServerStats ts = do
|
||||
activeQueuesNtf <- newPeriodStats
|
||||
qCount <- newTVar 0
|
||||
msgCount <- newTVar 0
|
||||
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
|
||||
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
|
||||
|
||||
getServerStatsData :: ServerStats -> STM ServerStatsData
|
||||
getServerStatsData s = do
|
||||
_fromTime <- readTVar $ fromTime s
|
||||
_qCreated <- readTVar $ qCreated s
|
||||
_qSecured <- readTVar $ qSecured s
|
||||
_qDeleted <- readTVar $ qDeleted s
|
||||
_qDeletedAll <- readTVar $ qDeletedAll s
|
||||
_qDeletedNew <- readTVar $ qDeletedNew s
|
||||
_qDeletedSecured <- readTVar $ qDeletedSecured s
|
||||
_msgSent <- readTVar $ msgSent s
|
||||
_msgRecv <- readTVar $ msgRecv s
|
||||
_msgExpired <- readTVar $ msgExpired s
|
||||
@@ -83,14 +92,16 @@ getServerStatsData s = do
|
||||
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
|
||||
_qCount <- readTVar $ qCount s
|
||||
_msgCount <- readTVar $ msgCount s
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
|
||||
|
||||
setServerStats :: ServerStats -> ServerStatsData -> STM ()
|
||||
setServerStats s d = do
|
||||
writeTVar (fromTime s) $! _fromTime d
|
||||
writeTVar (qCreated s) $! _qCreated d
|
||||
writeTVar (qSecured s) $! _qSecured d
|
||||
writeTVar (qDeleted s) $! _qDeleted d
|
||||
writeTVar (qDeletedAll s) $! _qDeletedAll d
|
||||
writeTVar (qDeletedNew s) $! _qDeletedNew d
|
||||
writeTVar (qDeletedSecured s) $! _qDeletedSecured d
|
||||
writeTVar (msgSent s) $! _msgSent d
|
||||
writeTVar (msgRecv s) $! _msgRecv d
|
||||
writeTVar (msgExpired s) $! _msgExpired d
|
||||
@@ -102,12 +113,14 @@ setServerStats s d = do
|
||||
writeTVar (msgCount s) $! _msgCount d
|
||||
|
||||
instance StrEncoding ServerStatsData where
|
||||
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} =
|
||||
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} =
|
||||
B.unlines
|
||||
[ "fromTime=" <> strEncode _fromTime,
|
||||
"qCreated=" <> strEncode _qCreated,
|
||||
"qSecured=" <> strEncode _qSecured,
|
||||
"qDeleted=" <> strEncode _qDeleted,
|
||||
"qDeletedAll=" <> strEncode _qDeletedAll,
|
||||
"qDeletedNew=" <> strEncode _qDeletedNew,
|
||||
"qDeletedSecured=" <> strEncode _qDeletedSecured,
|
||||
"qCount=" <> strEncode _qCount,
|
||||
"msgSent=" <> strEncode _msgSent,
|
||||
"msgRecv=" <> strEncode _msgRecv,
|
||||
@@ -123,7 +136,9 @@ instance StrEncoding ServerStatsData where
|
||||
_fromTime <- "fromTime=" *> strP <* A.endOfLine
|
||||
_qCreated <- "qCreated=" *> strP <* A.endOfLine
|
||||
_qSecured <- "qSecured=" *> strP <* A.endOfLine
|
||||
_qDeleted <- "qDeleted=" *> strP <* A.endOfLine
|
||||
(_qDeletedAll, _qDeletedNew, _qDeletedSecured) <-
|
||||
(,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine)
|
||||
<|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine))
|
||||
_qCount <- "qCount=" *> strP <* A.endOfLine <|> pure 0
|
||||
_msgSent <- "msgSent=" *> strP <* A.endOfLine
|
||||
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
|
||||
@@ -142,7 +157,7 @@ instance StrEncoding ServerStatsData where
|
||||
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
_ -> pure newPeriodStatsData
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0}
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0}
|
||||
|
||||
data PeriodStats a = PeriodStats
|
||||
{ day :: TVar (Set a),
|
||||
|
||||
@@ -604,7 +604,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 2
|
||||
logSize testStoreMsgsFile `shouldReturn` 5
|
||||
logSize testServerStatsBackupFile `shouldReturn` 18
|
||||
logSize testServerStatsBackupFile `shouldReturn` 20
|
||||
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats1 [rId] 5 1
|
||||
|
||||
@@ -622,7 +622,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
-- the last message is not removed because it was not ACK'd
|
||||
logSize testStoreMsgsFile `shouldReturn` 3
|
||||
logSize testServerStatsBackupFile `shouldReturn` 18
|
||||
logSize testServerStatsBackupFile `shouldReturn` 20
|
||||
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats2 [rId] 5 3
|
||||
|
||||
@@ -641,7 +641,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
logSize testStoreMsgsFile `shouldReturn` 0
|
||||
logSize testServerStatsBackupFile `shouldReturn` 18
|
||||
logSize testServerStatsBackupFile `shouldReturn` 20
|
||||
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats3 [rId] 5 5
|
||||
|
||||
@@ -661,7 +661,9 @@ checkStats :: ServerStatsData -> [RecipientId] -> Int -> Int -> Expectation
|
||||
checkStats s qs sent received = do
|
||||
_qCreated s `shouldBe` length qs
|
||||
_qSecured s `shouldBe` length qs
|
||||
_qDeleted s `shouldBe` 0
|
||||
_qDeletedAll s `shouldBe` 0
|
||||
_qDeletedNew s `shouldBe` 0
|
||||
_qDeletedSecured s `shouldBe` 0
|
||||
_msgSent s `shouldBe` sent
|
||||
_msgRecv s `shouldBe` received
|
||||
_msgSentNtf s `shouldBe` 0
|
||||
|
||||
Reference in New Issue
Block a user