mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 18:35:59 +00:00
smp-server: update stats (#943)
* smp-server: check queue balance in stats vs store * smp-server: add msgExpired stats * add msgExpired stats * split expire/stats transactions * count and pass msgExpired explicitly * save/load qCount and use it for checking store
This commit is contained in:
committed by
GitHub
parent
f954c2cd17
commit
36298f2cea
@@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
@@ -118,8 +119,8 @@ type M a = ReaderT Env IO a
|
||||
smpServer :: TMVar Bool -> ServerConfig -> M ()
|
||||
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
s <- asks server
|
||||
restoreServerMessages
|
||||
restoreServerStats
|
||||
expired <- restoreServerMessages
|
||||
restoreServerStats expired
|
||||
raceAny_
|
||||
( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub
|
||||
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ())
|
||||
@@ -178,14 +179,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
ms <- asks msgStore
|
||||
quota <- asks $ msgQueueQuota . config
|
||||
let interval = checkInterval expCfg * 1000000
|
||||
stats <- asks serverStats
|
||||
labelMyThread "expireMessages"
|
||||
forever $ do
|
||||
liftIO $ threadDelay' interval
|
||||
old <- liftIO $ expireBeforeEpoch expCfg
|
||||
rIds <- M.keysSet <$> readTVarIO ms
|
||||
forM_ rIds $ \rId ->
|
||||
atomically (getMsgQueue ms rId quota)
|
||||
>>= atomically . (`deleteExpiredMsgs` old)
|
||||
forM_ rIds $ \rId -> do
|
||||
q <- atomically (getMsgQueue ms rId quota)
|
||||
deleted <- atomically $ deleteExpiredMsgs q old
|
||||
atomically $ modifyTVar' (msgExpired stats) (+ deleted)
|
||||
|
||||
serverStatsThread_ :: ServerConfig -> [M ()]
|
||||
serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} =
|
||||
@@ -198,7 +201,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
hSetBuffering h LineBuffering
|
||||
@@ -210,6 +213,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
qDeleted' <- atomically $ swapTVar qDeleted 0
|
||||
msgSent' <- atomically $ swapTVar msgSent 0
|
||||
msgRecv' <- atomically $ swapTVar msgRecv 0
|
||||
msgExpired' <- atomically $ swapTVar msgExpired 0
|
||||
ps <- atomically $ periodStatCounts activeQueues ts
|
||||
msgSentNtf' <- atomically $ swapTVar msgSentNtf 0
|
||||
msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0
|
||||
@@ -234,7 +238,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
weekCount psNtf,
|
||||
monthCount psNtf,
|
||||
show qCount',
|
||||
show msgCount'
|
||||
show msgCount',
|
||||
show msgExpired'
|
||||
]
|
||||
threadDelay' interval
|
||||
|
||||
@@ -758,7 +763,9 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ, sess
|
||||
expireMessages q = do
|
||||
msgExp <- asks $ messageExpiration . config
|
||||
old <- liftIO $ mapM expireBeforeEpoch msgExp
|
||||
atomically $ mapM_ (deleteExpiredMsgs q) old
|
||||
stats <- asks serverStats
|
||||
deleted <- atomically $ sum <$> mapM (deleteExpiredMsgs q) old
|
||||
atomically $ modifyTVar' (msgExpired stats) (+ deleted)
|
||||
|
||||
trySendNotification :: Message -> TVar ChaChaDRG -> STM ()
|
||||
trySendNotification msg ntfNonceDrg =
|
||||
@@ -892,24 +899,27 @@ saveServerMessages keepMsgs = asks (storeMsgsFile . config) >>= mapM_ saveMessag
|
||||
atomically (getMessages ms rId)
|
||||
>>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId)
|
||||
|
||||
restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m ()
|
||||
restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
|
||||
restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m Int
|
||||
restoreServerMessages = asks (storeMsgsFile . config) >>= \case
|
||||
Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0)
|
||||
Nothing -> pure 0
|
||||
where
|
||||
restoreMessages f = whenM (doesFileExist f) $ do
|
||||
restoreMessages f = do
|
||||
logInfo $ "restoring messages from file " <> T.pack f
|
||||
st <- asks queueStore
|
||||
ms <- asks msgStore
|
||||
quota <- asks $ msgQueueQuota . config
|
||||
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
|
||||
runExceptT (liftIO (B.readFile f) >>= mapM_ (restoreMsg st ms quota old_) . B.lines) >>= \case
|
||||
runExceptT (liftIO (B.readFile f) >>= foldM (\expired -> restoreMsg expired st ms quota old_) 0 . B.lines) >>= \case
|
||||
Left e -> do
|
||||
logError . T.pack $ "error restoring messages: " <> e
|
||||
liftIO exitFailure
|
||||
_ -> do
|
||||
Right expired -> do
|
||||
renameFile f $ f <> ".bak"
|
||||
logInfo "messages restored"
|
||||
pure expired
|
||||
where
|
||||
restoreMsg st ms quota old_ s = do
|
||||
restoreMsg !expired st ms quota old_ s = do
|
||||
r <- liftEither . first (msgErr "parsing") $ strDecode s
|
||||
case r of
|
||||
MLRv3 rId msg -> addToMsgQueue rId msg
|
||||
@@ -919,14 +929,15 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
|
||||
addToMsgQueue rId msg'
|
||||
where
|
||||
addToMsgQueue rId msg = do
|
||||
logFull <- atomically $ do
|
||||
(isExpired, logFull) <- atomically $ do
|
||||
q <- getMsgQueue ms rId quota
|
||||
case msg of
|
||||
Message {msgTs}
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> isNothing <$> writeMsg q msg
|
||||
| otherwise -> pure False
|
||||
MessageQuota {} -> writeMsg q msg $> False
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg
|
||||
| otherwise -> pure (True, False)
|
||||
MessageQuota {} -> writeMsg q msg $> (False, False)
|
||||
when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg)
|
||||
pure $ if isExpired then expired + 1 else expired
|
||||
updateMsgV1toV3 QueueRec {rcvDhSecret} RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} = do
|
||||
let nonce = C.cbNonce msgId
|
||||
msgBody <- liftEither . first (msgErr "v1 message decryption") $ C.maxLenBS =<< C.cbDecrypt rcvDhSecret nonce body
|
||||
@@ -944,19 +955,21 @@ saveServerStats =
|
||||
B.writeFile f $ strEncode stats
|
||||
logInfo "server stats saved"
|
||||
|
||||
restoreServerStats :: (MonadUnliftIO m, MonadReader Env m) => m ()
|
||||
restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
|
||||
restoreServerStats :: (MonadUnliftIO m, MonadReader Env m) => Int -> m ()
|
||||
restoreServerStats expiredWhileRestoring = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
|
||||
where
|
||||
restoreStats f = whenM (doesFileExist f) $ do
|
||||
logInfo $ "restoring server stats from file " <> T.pack f
|
||||
liftIO (strDecode <$> B.readFile f) >>= \case
|
||||
Right d -> do
|
||||
Right d@ServerStatsData {_qCount = statsQCount} -> do
|
||||
s <- asks serverStats
|
||||
_qCount <- fmap (length . M.keys) . readTVarIO . queues =<< asks queueStore
|
||||
_msgCount <- foldM (\n q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore
|
||||
atomically $ setServerStats s d {_qCount, _msgCount}
|
||||
_qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore
|
||||
_msgCount <- foldM (\(!n) q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore
|
||||
atomically $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredWhileRestoring}
|
||||
renameFile f $ f <> ".bak"
|
||||
logInfo "server stats restored"
|
||||
when (_qCount /= statsQCount) $ logWarn $ "Queue count differs: stats: " <> tshow statsQCount <> ", store: " <> tshow _qCount
|
||||
logInfo $ "Restored " <> tshow _msgCount <> " messages in " <> tshow _qCount <> " queues"
|
||||
Left e -> do
|
||||
logInfo $ "error restoring server stats: " <> T.pack e
|
||||
liftIO exitFailure
|
||||
|
||||
@@ -25,7 +25,6 @@ module Simplex.Messaging.Server.MsgStore.STM
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM.TQueue (flushTQueue)
|
||||
import Control.Monad (when)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
@@ -116,15 +115,15 @@ tryDelPeekMsg mq msgId' =
|
||||
| otherwise -> pure (Nothing, msg_)
|
||||
_ -> pure (Nothing, Nothing)
|
||||
|
||||
deleteExpiredMsgs :: MsgQueue -> Int64 -> STM ()
|
||||
deleteExpiredMsgs mq old = loop
|
||||
deleteExpiredMsgs :: MsgQueue -> Int64 -> STM Int
|
||||
deleteExpiredMsgs mq old = loop 0
|
||||
where
|
||||
loop = tryPeekMsg mq >>= mapM_ delOldMsg
|
||||
delOldMsg = \case
|
||||
Message {msgTs} ->
|
||||
when (systemSeconds msgTs < old) $
|
||||
tryDeleteMsg mq >> loop
|
||||
_ -> pure ()
|
||||
loop dc =
|
||||
tryPeekMsg mq >>= \case
|
||||
Just Message {msgTs}
|
||||
| systemSeconds msgTs < old ->
|
||||
tryDeleteMsg mq >> loop (dc + 1)
|
||||
_ -> pure dc
|
||||
|
||||
tryDeleteMsg :: MsgQueue -> STM ()
|
||||
tryDeleteMsg MsgQueue {msgQueue = q, size} =
|
||||
|
||||
@@ -25,6 +25,7 @@ data ServerStats = ServerStats
|
||||
qDeleted :: TVar Int,
|
||||
msgSent :: TVar Int,
|
||||
msgRecv :: TVar Int,
|
||||
msgExpired :: TVar Int,
|
||||
activeQueues :: PeriodStats RecipientId,
|
||||
msgSentNtf :: TVar Int,
|
||||
msgRecvNtf :: TVar Int,
|
||||
@@ -40,6 +41,7 @@ data ServerStatsData = ServerStatsData
|
||||
_qDeleted :: Int,
|
||||
_msgSent :: Int,
|
||||
_msgRecv :: Int,
|
||||
_msgExpired :: Int,
|
||||
_activeQueues :: PeriodStatsData RecipientId,
|
||||
_msgSentNtf :: Int,
|
||||
_msgRecvNtf :: Int,
|
||||
@@ -57,13 +59,14 @@ newServerStats ts = do
|
||||
qDeleted <- newTVar 0
|
||||
msgSent <- newTVar 0
|
||||
msgRecv <- newTVar 0
|
||||
msgExpired <- newTVar 0
|
||||
activeQueues <- newPeriodStats
|
||||
msgSentNtf <- newTVar 0
|
||||
msgRecvNtf <- newTVar 0
|
||||
activeQueuesNtf <- newPeriodStats
|
||||
qCount <- newTVar 0
|
||||
msgCount <- newTVar 0
|
||||
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
|
||||
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
|
||||
|
||||
getServerStatsData :: ServerStats -> STM ServerStatsData
|
||||
getServerStatsData s = do
|
||||
@@ -73,13 +76,14 @@ getServerStatsData s = do
|
||||
_qDeleted <- readTVar $ qDeleted s
|
||||
_msgSent <- readTVar $ msgSent s
|
||||
_msgRecv <- readTVar $ msgRecv s
|
||||
_msgExpired <- readTVar $ msgExpired s
|
||||
_activeQueues <- getPeriodStatsData $ activeQueues s
|
||||
_msgSentNtf <- readTVar $ msgSentNtf s
|
||||
_msgRecvNtf <- readTVar $ msgRecvNtf s
|
||||
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
|
||||
_qCount <- readTVar $ qCount s
|
||||
_msgCount <- readTVar $ msgCount s
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
|
||||
|
||||
setServerStats :: ServerStats -> ServerStatsData -> STM ()
|
||||
setServerStats s d = do
|
||||
@@ -89,6 +93,7 @@ setServerStats s d = do
|
||||
writeTVar (qDeleted s) $! _qDeleted d
|
||||
writeTVar (msgSent s) $! _msgSent d
|
||||
writeTVar (msgRecv s) $! _msgRecv d
|
||||
writeTVar (msgExpired s) $! _msgExpired d
|
||||
setPeriodStats (activeQueues s) (_activeQueues d)
|
||||
writeTVar (msgSentNtf s) $! _msgSentNtf d
|
||||
writeTVar (msgRecvNtf s) $! _msgRecvNtf d
|
||||
@@ -97,14 +102,16 @@ setServerStats s d = do
|
||||
writeTVar (msgCount s) $! _msgCount d
|
||||
|
||||
instance StrEncoding ServerStatsData where
|
||||
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf} =
|
||||
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} =
|
||||
B.unlines
|
||||
[ "fromTime=" <> strEncode _fromTime,
|
||||
"qCreated=" <> strEncode _qCreated,
|
||||
"qSecured=" <> strEncode _qSecured,
|
||||
"qDeleted=" <> strEncode _qDeleted,
|
||||
"qCount=" <> strEncode _qCount,
|
||||
"msgSent=" <> strEncode _msgSent,
|
||||
"msgRecv=" <> strEncode _msgRecv,
|
||||
"msgExpired=" <> strEncode _msgExpired,
|
||||
"msgSentNtf=" <> strEncode _msgSentNtf,
|
||||
"msgRecvNtf=" <> strEncode _msgRecvNtf,
|
||||
"activeQueues:",
|
||||
@@ -117,8 +124,10 @@ instance StrEncoding ServerStatsData where
|
||||
_qCreated <- "qCreated=" *> strP <* A.endOfLine
|
||||
_qSecured <- "qSecured=" *> strP <* A.endOfLine
|
||||
_qDeleted <- "qDeleted=" *> strP <* A.endOfLine
|
||||
_qCount <- "qCount=" *> strP <* A.endOfLine <|> pure 0
|
||||
_msgSent <- "msgSent=" *> strP <* A.endOfLine
|
||||
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
|
||||
_msgExpired <- "msgExpired=" *> strP <* A.endOfLine <|> pure 0
|
||||
_msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0
|
||||
_msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0
|
||||
_activeQueues <-
|
||||
@@ -133,7 +142,7 @@ instance StrEncoding ServerStatsData where
|
||||
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
_ -> pure newPeriodStatsData
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount = 0, _msgCount = 0}
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0}
|
||||
|
||||
data PeriodStats a = PeriodStats
|
||||
{ day :: TVar (Set a),
|
||||
|
||||
@@ -662,7 +662,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 2
|
||||
logSize testStoreMsgsFile `shouldReturn` 5
|
||||
logSize testServerStatsBackupFile `shouldReturn` 16
|
||||
logSize testServerStatsBackupFile `shouldReturn` 18
|
||||
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats1 [rId] 5 1
|
||||
|
||||
@@ -680,7 +680,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
-- the last message is not removed because it was not ACK'd
|
||||
logSize testStoreMsgsFile `shouldReturn` 3
|
||||
logSize testServerStatsBackupFile `shouldReturn` 16
|
||||
logSize testServerStatsBackupFile `shouldReturn` 18
|
||||
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats2 [rId] 5 3
|
||||
|
||||
@@ -699,7 +699,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
logSize testStoreMsgsFile `shouldReturn` 0
|
||||
logSize testServerStatsBackupFile `shouldReturn` 16
|
||||
logSize testServerStatsBackupFile `shouldReturn` 18
|
||||
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats3 [rId] 5 5
|
||||
|
||||
@@ -830,7 +830,7 @@ testRestoreExpireMessages at@(ATransport t) =
|
||||
length (B.lines msgs) `shouldBe` 4
|
||||
|
||||
let expCfg1 = Just ExpirationConfig {ttl = 86400, checkInterval = 43200}
|
||||
cfg1 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg1}
|
||||
cfg1 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile}
|
||||
withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
@@ -838,7 +838,7 @@ testRestoreExpireMessages at@(ATransport t) =
|
||||
msgs' `shouldBe` msgs
|
||||
|
||||
let expCfg2 = Just ExpirationConfig {ttl = 2, checkInterval = 43200}
|
||||
cfg2 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg2}
|
||||
cfg2 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile}
|
||||
withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure ()
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
@@ -846,6 +846,8 @@ testRestoreExpireMessages at@(ATransport t) =
|
||||
msgs'' <- B.readFile testStoreMsgsFile
|
||||
length (B.lines msgs'') `shouldBe` 2
|
||||
B.lines msgs'' `shouldBe` drop 2 (B.lines msgs)
|
||||
Right ServerStatsData {_msgExpired} <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
_msgExpired `shouldBe` 2
|
||||
where
|
||||
runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation
|
||||
runTest _ test' server = do
|
||||
|
||||
Reference in New Issue
Block a user