mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 22:45:06 +00:00
ntf stats on SMP server (#1197)
* log undelivered notifications * type * add counters and encoding * rename * diff * style * style2 --------- Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
6597f6f0ed
commit
388d77b61a
@@ -229,7 +229,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
|
||||
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
forever $ do
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
@@ -255,6 +255,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
msgSentNtf' <- atomically $ swapTVar msgSentNtf 0
|
||||
msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0
|
||||
psNtf <- atomically $ periodStatCounts activeQueuesNtf ts
|
||||
msgNtfs' <- atomically $ swapTVar (msgNtfs ss) 0
|
||||
msgNtfNoSub' <- atomically $ swapTVar (msgNtfNoSub ss) 0
|
||||
msgNtfLost' <- atomically $ swapTVar (msgNtfLost ss) 0
|
||||
pRelays' <- atomically $ getResetProxyStatsData pRelays
|
||||
pRelaysOwn' <- atomically $ getResetProxyStatsData pRelaysOwn
|
||||
pMsgFwds' <- atomically $ getResetProxyStatsData pMsgFwds
|
||||
@@ -296,7 +299,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
show qSubProhibited',
|
||||
show msgSentAuth',
|
||||
show msgSentQuota',
|
||||
show msgSentLarge'
|
||||
show msgSentLarge',
|
||||
show msgNtfs',
|
||||
show msgNtfNoSub',
|
||||
show msgNtfLost'
|
||||
]
|
||||
)
|
||||
liftIO $ threadDelay' interval
|
||||
@@ -1010,7 +1016,15 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
pure $ err QUOTA
|
||||
Just msg -> time "SEND ok" $ do
|
||||
when (notification msgFlags) $ do
|
||||
atomically . trySendNotification msg =<< asks random
|
||||
forM_ (notifier qr) $ \ntf -> do
|
||||
asks random >>= atomically . trySendNotification ntf msg >>= \case
|
||||
Nothing -> do
|
||||
atomically $ modifyTVar' (msgNtfNoSub stats) (+ 1)
|
||||
logWarn "No notification subscription"
|
||||
Just False -> do
|
||||
atomically $ modifyTVar' (msgNtfLost stats) (+ 1)
|
||||
logWarn "Dropped message notification"
|
||||
Just True -> atomically $ modifyTVar' (msgNtfs stats) (+ 1)
|
||||
atomically $ modifyTVar' (msgSentNtf stats) (+ 1)
|
||||
atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
|
||||
atomically $ modifyTVar' (msgSent stats) (+ 1)
|
||||
@@ -1033,18 +1047,19 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
deleted <- atomically $ sum <$> mapM (deleteExpiredMsgs q) old
|
||||
atomically $ modifyTVar' (msgExpired stats) (+ deleted)
|
||||
|
||||
trySendNotification :: Message -> TVar ChaChaDRG -> STM ()
|
||||
trySendNotification msg ntfNonceDrg =
|
||||
forM_ (notifier qr) $ \NtfCreds {notifierId, rcvNtfDhSecret} ->
|
||||
mapM_ (writeNtf notifierId msg rcvNtfDhSecret ntfNonceDrg) =<< TM.lookup notifierId notifiers
|
||||
trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool)
|
||||
trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg =
|
||||
mapM (writeNtf notifierId msg rcvNtfDhSecret ntfNonceDrg) =<< TM.lookup notifierId notifiers
|
||||
|
||||
writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM ()
|
||||
writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM Bool
|
||||
writeNtf nId msg rcvNtfDhSecret ntfNonceDrg Client {sndQ = q} =
|
||||
unlessM (isFullTBQueue q) $ case msg of
|
||||
Message {msgId, msgTs} -> do
|
||||
(nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg
|
||||
writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)]
|
||||
_ -> pure ()
|
||||
ifM (isFullTBQueue q) (pure False) (sendNtf $> True)
|
||||
where
|
||||
sendNtf = case msg of
|
||||
Message {msgId, msgTs} -> do
|
||||
(nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg
|
||||
writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)]
|
||||
_ -> pure ()
|
||||
|
||||
mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> TVar ChaChaDRG -> STM (C.CbNonce, EncNMsgMeta)
|
||||
mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg = do
|
||||
|
||||
@@ -37,9 +37,12 @@ data ServerStats = ServerStats
|
||||
msgRecv :: TVar Int,
|
||||
msgExpired :: TVar Int,
|
||||
activeQueues :: PeriodStats RecipientId,
|
||||
msgSentNtf :: TVar Int,
|
||||
msgRecvNtf :: TVar Int,
|
||||
msgSentNtf :: TVar Int, -- sent messages with NTF flag
|
||||
msgRecvNtf :: TVar Int, -- received messages with NTF flag
|
||||
activeQueuesNtf :: PeriodStats RecipientId,
|
||||
msgNtfs :: TVar Int, -- messages notications delivered to NTF server (<= msgSentNtf)
|
||||
msgNtfNoSub :: TVar Int, -- no subscriber to notifications (e.g., NTF server not connected)
|
||||
msgNtfLost :: TVar Int, -- notification is lost because NTF delivery queue is full
|
||||
pRelays :: ProxyStats,
|
||||
pRelaysOwn :: ProxyStats,
|
||||
pMsgFwds :: ProxyStats,
|
||||
@@ -70,6 +73,9 @@ data ServerStatsData = ServerStatsData
|
||||
_msgSentNtf :: Int,
|
||||
_msgRecvNtf :: Int,
|
||||
_activeQueuesNtf :: PeriodStatsData RecipientId,
|
||||
_msgNtfs :: Int,
|
||||
_msgNtfNoSub :: Int,
|
||||
_msgNtfLost :: Int,
|
||||
_pRelays :: ProxyStatsData,
|
||||
_pRelaysOwn :: ProxyStatsData,
|
||||
_pMsgFwds :: ProxyStatsData,
|
||||
@@ -102,6 +108,9 @@ newServerStats ts = do
|
||||
msgSentNtf <- newTVar 0
|
||||
msgRecvNtf <- newTVar 0
|
||||
activeQueuesNtf <- newPeriodStats
|
||||
msgNtfs <- newTVar 0
|
||||
msgNtfNoSub <- newTVar 0
|
||||
msgNtfLost <- newTVar 0
|
||||
pRelays <- newProxyStats
|
||||
pRelaysOwn <- newProxyStats
|
||||
pMsgFwds <- newProxyStats
|
||||
@@ -109,7 +118,39 @@ newServerStats ts = do
|
||||
pMsgFwdsRecv <- newTVar 0
|
||||
qCount <- newTVar 0
|
||||
msgCount <- newTVar 0
|
||||
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount}
|
||||
pure
|
||||
ServerStats
|
||||
{ fromTime,
|
||||
qCreated,
|
||||
qSecured,
|
||||
qDeletedAll,
|
||||
qDeletedNew,
|
||||
qDeletedSecured,
|
||||
qSub,
|
||||
qSubAuth,
|
||||
qSubDuplicate,
|
||||
qSubProhibited,
|
||||
msgSent,
|
||||
msgSentAuth,
|
||||
msgSentQuota,
|
||||
msgSentLarge,
|
||||
msgRecv,
|
||||
msgExpired,
|
||||
activeQueues,
|
||||
msgSentNtf,
|
||||
msgRecvNtf,
|
||||
activeQueuesNtf,
|
||||
msgNtfs,
|
||||
msgNtfNoSub,
|
||||
msgNtfLost,
|
||||
pRelays,
|
||||
pRelaysOwn,
|
||||
pMsgFwds,
|
||||
pMsgFwdsOwn,
|
||||
pMsgFwdsRecv,
|
||||
qCount,
|
||||
msgCount
|
||||
}
|
||||
|
||||
getServerStatsData :: ServerStats -> STM ServerStatsData
|
||||
getServerStatsData s = do
|
||||
@@ -121,7 +162,7 @@ getServerStatsData s = do
|
||||
_qDeletedSecured <- readTVar $ qDeletedSecured s
|
||||
_qSub <- readTVar $ qSub s
|
||||
_qSubAuth <- readTVar $ qSubAuth s
|
||||
_qSubDuplicate <- readTVar $ qSubDuplicate s
|
||||
_qSubDuplicate <- readTVar $ qSubDuplicate s
|
||||
_qSubProhibited <- readTVar $ qSubProhibited s
|
||||
_msgSent <- readTVar $ msgSent s
|
||||
_msgSentAuth <- readTVar $ msgSentAuth s
|
||||
@@ -133,6 +174,9 @@ getServerStatsData s = do
|
||||
_msgSentNtf <- readTVar $ msgSentNtf s
|
||||
_msgRecvNtf <- readTVar $ msgRecvNtf s
|
||||
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
|
||||
_msgNtfs <- readTVar $ msgNtfs s
|
||||
_msgNtfNoSub <- readTVar $ msgNtfNoSub s
|
||||
_msgNtfLost <- readTVar $ msgNtfLost s
|
||||
_pRelays <- getProxyStatsData $ pRelays s
|
||||
_pRelaysOwn <- getProxyStatsData $ pRelaysOwn s
|
||||
_pMsgFwds <- getProxyStatsData $ pMsgFwds s
|
||||
@@ -140,7 +184,39 @@ getServerStatsData s = do
|
||||
_pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s
|
||||
_qCount <- readTVar $ qCount s
|
||||
_msgCount <- readTVar $ msgCount s
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount}
|
||||
pure
|
||||
ServerStatsData
|
||||
{ _fromTime,
|
||||
_qCreated,
|
||||
_qSecured,
|
||||
_qDeletedAll,
|
||||
_qDeletedNew,
|
||||
_qDeletedSecured,
|
||||
_qSub,
|
||||
_qSubAuth,
|
||||
_qSubDuplicate,
|
||||
_qSubProhibited,
|
||||
_msgSent,
|
||||
_msgSentAuth,
|
||||
_msgSentQuota,
|
||||
_msgSentLarge,
|
||||
_msgRecv,
|
||||
_msgExpired,
|
||||
_activeQueues,
|
||||
_msgSentNtf,
|
||||
_msgRecvNtf,
|
||||
_activeQueuesNtf,
|
||||
_msgNtfs,
|
||||
_msgNtfNoSub,
|
||||
_msgNtfLost,
|
||||
_pRelays,
|
||||
_pRelaysOwn,
|
||||
_pMsgFwds,
|
||||
_pMsgFwdsOwn,
|
||||
_pMsgFwdsRecv,
|
||||
_qCount,
|
||||
_msgCount
|
||||
}
|
||||
|
||||
setServerStats :: ServerStats -> ServerStatsData -> STM ()
|
||||
setServerStats s d = do
|
||||
@@ -151,7 +227,7 @@ setServerStats s d = do
|
||||
writeTVar (qDeletedNew s) $! _qDeletedNew d
|
||||
writeTVar (qDeletedSecured s) $! _qDeletedSecured d
|
||||
writeTVar (qSub s) $! _qSub d
|
||||
writeTVar (qSubAuth s) $! _qSubAuth d
|
||||
writeTVar (qSubAuth s) $! _qSubAuth d
|
||||
writeTVar (qSubDuplicate s) $! _qSubDuplicate d
|
||||
writeTVar (qSubProhibited s) $! _qSubProhibited d
|
||||
writeTVar (msgSent s) $! _msgSent d
|
||||
@@ -164,6 +240,9 @@ setServerStats s d = do
|
||||
writeTVar (msgSentNtf s) $! _msgSentNtf d
|
||||
writeTVar (msgRecvNtf s) $! _msgRecvNtf d
|
||||
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
|
||||
writeTVar (msgNtfs s) $! _msgNtfs d
|
||||
writeTVar (msgNtfNoSub s) $! _msgNtfNoSub d
|
||||
writeTVar (msgNtfLost s) $! _msgNtfLost d
|
||||
setProxyStats (pRelays s) $! _pRelays d
|
||||
setProxyStats (pRelaysOwn s) $! _pRelaysOwn d
|
||||
setProxyStats (pMsgFwds s) $! _pMsgFwds d
|
||||
@@ -194,6 +273,9 @@ instance StrEncoding ServerStatsData where
|
||||
"msgExpired=" <> strEncode (_msgExpired d),
|
||||
"msgSentNtf=" <> strEncode (_msgSentNtf d),
|
||||
"msgRecvNtf=" <> strEncode (_msgRecvNtf d),
|
||||
"msgNtfs=" <> strEncode (_msgNtfs d),
|
||||
"msgNtfNoSub=" <> strEncode (_msgNtfNoSub d),
|
||||
"msgNtfLost=" <> strEncode (_msgNtfLost d),
|
||||
"activeQueues:",
|
||||
strEncode (_activeQueues d),
|
||||
"activeQueuesNtf:",
|
||||
@@ -228,6 +310,9 @@ instance StrEncoding ServerStatsData where
|
||||
_msgExpired <- opt "msgExpired="
|
||||
_msgSentNtf <- opt "msgSentNtf="
|
||||
_msgRecvNtf <- opt "msgRecvNtf="
|
||||
_msgNtfs <- opt "msgNtfs="
|
||||
_msgNtfNoSub <- opt "msgNtfNoSub="
|
||||
_msgNtfLost <- opt "msgNtfLost="
|
||||
_activeQueues <-
|
||||
optional ("activeQueues:" <* A.endOfLine) >>= \case
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
@@ -245,7 +330,39 @@ instance StrEncoding ServerStatsData where
|
||||
_pMsgFwds <- proxyStatsP "pMsgFwds:"
|
||||
_pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:"
|
||||
_pMsgFwdsRecv <- opt "pMsgFwdsRecv="
|
||||
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0}
|
||||
pure
|
||||
ServerStatsData
|
||||
{ _fromTime,
|
||||
_qCreated,
|
||||
_qSecured,
|
||||
_qDeletedAll,
|
||||
_qDeletedNew,
|
||||
_qDeletedSecured,
|
||||
_qSub,
|
||||
_qSubAuth,
|
||||
_qSubDuplicate,
|
||||
_qSubProhibited,
|
||||
_msgSent,
|
||||
_msgSentAuth,
|
||||
_msgSentQuota,
|
||||
_msgSentLarge,
|
||||
_msgRecv,
|
||||
_msgExpired,
|
||||
_msgSentNtf,
|
||||
_msgRecvNtf,
|
||||
_msgNtfs,
|
||||
_msgNtfNoSub,
|
||||
_msgNtfLost,
|
||||
_activeQueues,
|
||||
_activeQueuesNtf,
|
||||
_pRelays,
|
||||
_pRelaysOwn,
|
||||
_pMsgFwds,
|
||||
_pMsgFwdsOwn,
|
||||
_pMsgFwdsRecv,
|
||||
_qCount,
|
||||
_msgCount = 0
|
||||
}
|
||||
where
|
||||
opt s = A.string s *> strP <* A.endOfLine <|> pure 0
|
||||
proxyStatsP key =
|
||||
|
||||
@@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 2
|
||||
logSize testStoreMsgsFile `shouldReturn` 5
|
||||
logSize testServerStatsBackupFile `shouldReturn` 52
|
||||
logSize testServerStatsBackupFile `shouldReturn` 55
|
||||
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats1 [rId] 5 1
|
||||
|
||||
@@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
-- the last message is not removed because it was not ACK'd
|
||||
logSize testStoreMsgsFile `shouldReturn` 3
|
||||
logSize testServerStatsBackupFile `shouldReturn` 52
|
||||
logSize testServerStatsBackupFile `shouldReturn` 55
|
||||
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats2 [rId] 5 3
|
||||
|
||||
@@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) =
|
||||
|
||||
logSize testStoreLogFile `shouldReturn` 1
|
||||
logSize testStoreMsgsFile `shouldReturn` 0
|
||||
logSize testServerStatsBackupFile `shouldReturn` 52
|
||||
logSize testServerStatsBackupFile `shouldReturn` 55
|
||||
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
|
||||
checkStats stats3 [rId] 5 5
|
||||
|
||||
|
||||
Reference in New Issue
Block a user