From 9b38f69e7e28700883d44fb14d4822637cb46326 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 18 Feb 2024 16:25:32 +0000 Subject: [PATCH] smp server: add stats for queues deleted before and after they were secured (#1002) --- src/Simplex/Messaging/Server.hs | 46 +++++++++++-------- .../Messaging/Server/QueueStore/STM.hs | 4 +- src/Simplex/Messaging/Server/Stats.hs | 37 ++++++++++----- tests/ServerTests.hs | 10 ++-- 4 files changed, 61 insertions(+), 36 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a85ce6cb8..2b965f158 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -204,7 +204,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -213,7 +213,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do fromTime' <- atomically $ swapTVar fromTime ts qCreated' <- atomically $ swapTVar qCreated 0 qSecured' <- atomically $ swapTVar qSecured 0 - qDeleted' <- atomically $ swapTVar qDeleted 0 + qDeletedAll' <- atomically $ swapTVar qDeletedAll 0 + qDeletedNew' <- atomically $ swapTVar qDeletedNew 0 + qDeletedSecured' <- atomically $ swapTVar qDeletedSecured 0 msgSent' <- atomically $ swapTVar msgSent 0 msgRecv' <- atomically $ swapTVar msgRecv 0 msgExpired' <- atomically $ swapTVar msgExpired 0 @@ -229,7 +231,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do [ iso8601Show $ utctDay fromTime', show qCreated', show qSecured', - show qDeleted', + show qDeletedAll', show msgSent', show msgRecv', dayCount ps, @@ -242,7 +244,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do monthCount psNtf, show qCount', show msgCount', - show msgExpired' + show msgExpired', + show qDeletedNew', + show qDeletedSecured' ] liftIO $ threadDelay' interval @@ -299,11 +303,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do subscriptions' <- bshow . M.size <$> readTVarIO subscriptions hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions'] CPStats -> do - ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats putStat "fromTime" fromTime putStat "qCreated" qCreated putStat "qSecured" qSecured - putStat "qDeleted" qDeleted + putStat "qDeletedAll" qDeletedAll + putStat "qDeletedNew" qDeletedNew + putStat "qDeletedSecured" qDeletedSecured putStat "msgSent" msgSent putStat "msgRecv" msgRecv putStat "msgSentNtf" msgSentNtf @@ -350,19 +356,17 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do CPDelete queueId' -> unliftIO u $ do st <- asks queueStore ms <- asks msgStore - stats <- asks serverStats queueId <- atomically (getQueue st SSender queueId') >>= \case Left _ -> pure queueId' -- fallback to using as recipientId directly Right QueueRec {recipientId} -> pure recipientId r <- atomically $ - deleteQueue st queueId $>>= \() -> - Right <$> delMsgQueueSize ms queueId + deleteQueue st queueId $>>= \q -> + Right . (q,) <$> delMsgQueueSize ms queueId case r of Left e -> liftIO . hPutStrLn h $ "error: " <> show e - Right numDeleted -> do + Right (q, numDeleted) -> do withLog (`logDeleteQueue` queueId) - atomically $ modifyTVar' (qDeleted stats) (+ 1) - atomically $ modifyTVar' (qCount stats) (subtract 1) + updateDeletedStats q liftIO . hPutStrLn h $ "ok, " <> show numDeleted <> " messages deleted" CPSave -> withLock (savingLock srv) "control" $ do hPutStrLn h "saving server state..." @@ -878,13 +882,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv delQueueAndMsgs st = do withLog (`logDeleteQueue` queueId) ms <- asks msgStore - stats <- asks serverStats - atomically $ modifyTVar' (qDeleted stats) (+ 1) - atomically $ modifyTVar' (qCount stats) (subtract 1) - atomically $ - deleteQueue st queueId >>= \case - Left e -> pure $ err e - Right _ -> delMsgQueue ms queueId $> ok + atomically (deleteQueue st queueId $>>= \q -> delMsgQueue ms queueId $> Right q) >>= \case + Right q -> updateDeletedStats q $> ok + Left e -> pure $ err e ok :: Transmission BrokerMsg ok = (corrId, queueId, OK) @@ -895,6 +895,14 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv okResp :: Either ErrorType () -> Transmission BrokerMsg okResp = either err $ const ok +updateDeletedStats :: (MonadUnliftIO m, MonadReader Env m) => QueueRec -> m () +updateDeletedStats q = do + stats <- asks serverStats + let delSel = if isNothing (senderKey q) then qDeletedNew else qDeletedSecured + atomically $ modifyTVar' (delSel stats) (+ 1) + atomically $ modifyTVar' (qDeletedAll stats) (+ 1) + atomically $ modifyTVar' (qCount stats) (subtract 1) + withLog :: (MonadUnliftIO m, MonadReader Env m) => (StoreLog 'WriteMode -> IO a) -> m () withLog action = do env <- ask diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 195955513..b76ad4998 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -94,14 +94,14 @@ suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ()) suspendQueue QueueStore {queues} rId = withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just () -deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ()) +deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType QueueRec) deleteQueue QueueStore {queues, senders, notifiers} rId = do TM.lookupDelete rId queues >>= \case Just qVar -> readTVar qVar >>= \q -> do TM.delete (senderId q) senders forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers - pure $ Right () + pure $ Right q _ -> pure $ Left AUTH toResult :: Maybe a -> Either ErrorType a diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 6bc54fac5..0b4c677c2 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -3,6 +3,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} module Simplex.Messaging.Server.Stats where @@ -22,7 +23,9 @@ data ServerStats = ServerStats { fromTime :: TVar UTCTime, qCreated :: TVar Int, qSecured :: TVar Int, - qDeleted :: TVar Int, + qDeletedAll :: TVar Int, + qDeletedNew :: TVar Int, + qDeletedSecured :: TVar Int, msgSent :: TVar Int, msgRecv :: TVar Int, msgExpired :: TVar Int, @@ -38,7 +41,9 @@ data ServerStatsData = ServerStatsData { _fromTime :: UTCTime, _qCreated :: Int, _qSecured :: Int, - _qDeleted :: Int, + _qDeletedAll :: Int, + _qDeletedNew :: Int, + _qDeletedSecured :: Int, _msgSent :: Int, _msgRecv :: Int, _msgExpired :: Int, @@ -56,7 +61,9 @@ newServerStats ts = do fromTime <- newTVar ts qCreated <- newTVar 0 qSecured <- newTVar 0 - qDeleted <- newTVar 0 + qDeletedAll <- newTVar 0 + qDeletedNew <- newTVar 0 + qDeletedSecured <- newTVar 0 msgSent <- newTVar 0 msgRecv <- newTVar 0 msgExpired <- newTVar 0 @@ -66,14 +73,16 @@ newServerStats ts = do activeQueuesNtf <- newPeriodStats qCount <- newTVar 0 msgCount <- newTVar 0 - pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} + pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do _fromTime <- readTVar $ fromTime s _qCreated <- readTVar $ qCreated s _qSecured <- readTVar $ qSecured s - _qDeleted <- readTVar $ qDeleted s + _qDeletedAll <- readTVar $ qDeletedAll s + _qDeletedNew <- readTVar $ qDeletedNew s + _qDeletedSecured <- readTVar $ qDeletedSecured s _msgSent <- readTVar $ msgSent s _msgRecv <- readTVar $ msgRecv s _msgExpired <- readTVar $ msgExpired s @@ -83,14 +92,16 @@ getServerStatsData s = do _activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s _qCount <- readTVar $ qCount s _msgCount <- readTVar $ msgCount s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do writeTVar (fromTime s) $! _fromTime d writeTVar (qCreated s) $! _qCreated d writeTVar (qSecured s) $! _qSecured d - writeTVar (qDeleted s) $! _qDeleted d + writeTVar (qDeletedAll s) $! _qDeletedAll d + writeTVar (qDeletedNew s) $! _qDeletedNew d + writeTVar (qDeletedSecured s) $! _qDeletedSecured d writeTVar (msgSent s) $! _msgSent d writeTVar (msgRecv s) $! _msgRecv d writeTVar (msgExpired s) $! _msgExpired d @@ -102,12 +113,14 @@ setServerStats s d = do writeTVar (msgCount s) $! _msgCount d instance StrEncoding ServerStatsData where - strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} = + strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} = B.unlines [ "fromTime=" <> strEncode _fromTime, "qCreated=" <> strEncode _qCreated, "qSecured=" <> strEncode _qSecured, - "qDeleted=" <> strEncode _qDeleted, + "qDeletedAll=" <> strEncode _qDeletedAll, + "qDeletedNew=" <> strEncode _qDeletedNew, + "qDeletedSecured=" <> strEncode _qDeletedSecured, "qCount=" <> strEncode _qCount, "msgSent=" <> strEncode _msgSent, "msgRecv=" <> strEncode _msgRecv, @@ -123,7 +136,9 @@ instance StrEncoding ServerStatsData where _fromTime <- "fromTime=" *> strP <* A.endOfLine _qCreated <- "qCreated=" *> strP <* A.endOfLine _qSecured <- "qSecured=" *> strP <* A.endOfLine - _qDeleted <- "qDeleted=" *> strP <* A.endOfLine + (_qDeletedAll, _qDeletedNew, _qDeletedSecured) <- + (,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine) + <|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine)) _qCount <- "qCount=" *> strP <* A.endOfLine <|> pure 0 _msgSent <- "msgSent=" *> strP <* A.endOfLine _msgRecv <- "msgRecv=" *> strP <* A.endOfLine @@ -142,7 +157,7 @@ instance StrEncoding ServerStatsData where optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case Just _ -> strP <* optional A.endOfLine _ -> pure newPeriodStatsData - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0} data PeriodStats a = PeriodStats { day :: TVar (Set a), diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index dc010c893..d6938fa0f 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -604,7 +604,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 18 + logSize testServerStatsBackupFile `shouldReturn` 20 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -622,7 +622,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 18 + logSize testServerStatsBackupFile `shouldReturn` 20 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -641,7 +641,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 18 + logSize testServerStatsBackupFile `shouldReturn` 20 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5 @@ -661,7 +661,9 @@ checkStats :: ServerStatsData -> [RecipientId] -> Int -> Int -> Expectation checkStats s qs sent received = do _qCreated s `shouldBe` length qs _qSecured s `shouldBe` length qs - _qDeleted s `shouldBe` 0 + _qDeletedAll s `shouldBe` 0 + _qDeletedNew s `shouldBe` 0 + _qDeletedSecured s `shouldBe` 0 _msgSent s `shouldBe` sent _msgRecv s `shouldBe` received _msgSentNtf s `shouldBe` 0