diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 9a9ddc482..21a019822 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} @@ -118,8 +119,8 @@ type M a = ReaderT Env IO a smpServer :: TMVar Bool -> ServerConfig -> M () smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do s <- asks server - restoreServerMessages - restoreServerStats + expired <- restoreServerMessages + restoreServerStats expired raceAny_ ( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) @@ -178,14 +179,16 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do ms <- asks msgStore quota <- asks $ msgQueueQuota . config let interval = checkInterval expCfg * 1000000 + stats <- asks serverStats labelMyThread "expireMessages" forever $ do liftIO $ threadDelay' interval old <- liftIO $ expireBeforeEpoch expCfg rIds <- M.keysSet <$> readTVarIO ms - forM_ rIds $ \rId -> - atomically (getMsgQueue ms rId quota) - >>= atomically . (`deleteExpiredMsgs` old) + forM_ rIds $ \rId -> do + q <- atomically (getMsgQueue ms rId quota) + deleted <- atomically $ deleteExpiredMsgs q old + atomically $ modifyTVar' (msgExpired stats) (+ deleted) serverStatsThread_ :: ServerConfig -> [M ()] serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = @@ -198,7 +201,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats let interval = 1000000 * logInterval withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering @@ -210,6 +213,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do qDeleted' <- atomically $ swapTVar qDeleted 0 msgSent' <- atomically $ swapTVar msgSent 0 msgRecv' <- atomically $ swapTVar msgRecv 0 + msgExpired' <- atomically $ swapTVar msgExpired 0 ps <- atomically $ periodStatCounts activeQueues ts msgSentNtf' <- atomically $ swapTVar msgSentNtf 0 msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0 @@ -234,7 +238,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do weekCount psNtf, monthCount psNtf, show qCount', - show msgCount' + show msgCount', + show msgExpired' ] threadDelay' interval @@ -758,7 +763,9 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ, sess expireMessages q = do msgExp <- asks $ messageExpiration . config old <- liftIO $ mapM expireBeforeEpoch msgExp - atomically $ mapM_ (deleteExpiredMsgs q) old + stats <- asks serverStats + deleted <- atomically $ sum <$> mapM (deleteExpiredMsgs q) old + atomically $ modifyTVar' (msgExpired stats) (+ deleted) trySendNotification :: Message -> TVar ChaChaDRG -> STM () trySendNotification msg ntfNonceDrg = @@ -892,24 +899,27 @@ saveServerMessages keepMsgs = asks (storeMsgsFile . config) >>= mapM_ saveMessag atomically (getMessages ms rId) >>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId) -restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m () -restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages +restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m Int +restoreServerMessages = asks (storeMsgsFile . config) >>= \case + Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0) + Nothing -> pure 0 where - restoreMessages f = whenM (doesFileExist f) $ do + restoreMessages f = do logInfo $ "restoring messages from file " <> T.pack f st <- asks queueStore ms <- asks msgStore quota <- asks $ msgQueueQuota . config old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch) - runExceptT (liftIO (B.readFile f) >>= mapM_ (restoreMsg st ms quota old_) . B.lines) >>= \case + runExceptT (liftIO (B.readFile f) >>= foldM (\expired -> restoreMsg expired st ms quota old_) 0 . B.lines) >>= \case Left e -> do logError . T.pack $ "error restoring messages: " <> e liftIO exitFailure - _ -> do + Right expired -> do renameFile f $ f <> ".bak" logInfo "messages restored" + pure expired where - restoreMsg st ms quota old_ s = do + restoreMsg !expired st ms quota old_ s = do r <- liftEither . first (msgErr "parsing") $ strDecode s case r of MLRv3 rId msg -> addToMsgQueue rId msg @@ -919,14 +929,15 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages addToMsgQueue rId msg' where addToMsgQueue rId msg = do - logFull <- atomically $ do + (isExpired, logFull) <- atomically $ do q <- getMsgQueue ms rId quota case msg of Message {msgTs} - | maybe True (systemSeconds msgTs >=) old_ -> isNothing <$> writeMsg q msg - | otherwise -> pure False - MessageQuota {} -> writeMsg q msg $> False + | maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg + | otherwise -> pure (True, False) + MessageQuota {} -> writeMsg q msg $> (False, False) when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg) + pure $ if isExpired then expired + 1 else expired updateMsgV1toV3 QueueRec {rcvDhSecret} RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} = do let nonce = C.cbNonce msgId msgBody <- liftEither . first (msgErr "v1 message decryption") $ C.maxLenBS =<< C.cbDecrypt rcvDhSecret nonce body @@ -944,19 +955,21 @@ saveServerStats = B.writeFile f $ strEncode stats logInfo "server stats saved" -restoreServerStats :: (MonadUnliftIO m, MonadReader Env m) => m () -restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats +restoreServerStats :: (MonadUnliftIO m, MonadReader Env m) => Int -> m () +restoreServerStats expiredWhileRestoring = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats where restoreStats f = whenM (doesFileExist f) $ do logInfo $ "restoring server stats from file " <> T.pack f liftIO (strDecode <$> B.readFile f) >>= \case - Right d -> do + Right d@ServerStatsData {_qCount = statsQCount} -> do s <- asks serverStats - _qCount <- fmap (length . M.keys) . readTVarIO . queues =<< asks queueStore - _msgCount <- foldM (\n q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore - atomically $ setServerStats s d {_qCount, _msgCount} + _qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore + _msgCount <- foldM (\(!n) q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore + atomically $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredWhileRestoring} renameFile f $ f <> ".bak" logInfo "server stats restored" + when (_qCount /= statsQCount) $ logWarn $ "Queue count differs: stats: " <> tshow statsQCount <> ", store: " <> tshow _qCount + logInfo $ "Restored " <> tshow _msgCount <> " messages in " <> tshow _qCount <> " queues" Left e -> do logInfo $ "error restoring server stats: " <> T.pack e liftIO exitFailure diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index f5158ecc8..2d735d1d4 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -25,7 +25,6 @@ module Simplex.Messaging.Server.MsgStore.STM where import Control.Concurrent.STM.TQueue (flushTQueue) -import Control.Monad (when) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) @@ -116,15 +115,15 @@ tryDelPeekMsg mq msgId' = | otherwise -> pure (Nothing, msg_) _ -> pure (Nothing, Nothing) -deleteExpiredMsgs :: MsgQueue -> Int64 -> STM () -deleteExpiredMsgs mq old = loop +deleteExpiredMsgs :: MsgQueue -> Int64 -> STM Int +deleteExpiredMsgs mq old = loop 0 where - loop = tryPeekMsg mq >>= mapM_ delOldMsg - delOldMsg = \case - Message {msgTs} -> - when (systemSeconds msgTs < old) $ - tryDeleteMsg mq >> loop - _ -> pure () + loop dc = + tryPeekMsg mq >>= \case + Just Message {msgTs} + | systemSeconds msgTs < old -> + tryDeleteMsg mq >> loop (dc + 1) + _ -> pure dc tryDeleteMsg :: MsgQueue -> STM () tryDeleteMsg MsgQueue {msgQueue = q, size} = diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 38e1d13db..6bc54fac5 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -25,6 +25,7 @@ data ServerStats = ServerStats qDeleted :: TVar Int, msgSent :: TVar Int, msgRecv :: TVar Int, + msgExpired :: TVar Int, activeQueues :: PeriodStats RecipientId, msgSentNtf :: TVar Int, msgRecvNtf :: TVar Int, @@ -40,6 +41,7 @@ data ServerStatsData = ServerStatsData _qDeleted :: Int, _msgSent :: Int, _msgRecv :: Int, + _msgExpired :: Int, _activeQueues :: PeriodStatsData RecipientId, _msgSentNtf :: Int, _msgRecvNtf :: Int, @@ -57,13 +59,14 @@ newServerStats ts = do qDeleted <- newTVar 0 msgSent <- newTVar 0 msgRecv <- newTVar 0 + msgExpired <- newTVar 0 activeQueues <- newPeriodStats msgSentNtf <- newTVar 0 msgRecvNtf <- newTVar 0 activeQueuesNtf <- newPeriodStats qCount <- newTVar 0 msgCount <- newTVar 0 - pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} + pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -73,13 +76,14 @@ getServerStatsData s = do _qDeleted <- readTVar $ qDeleted s _msgSent <- readTVar $ msgSent s _msgRecv <- readTVar $ msgRecv s + _msgExpired <- readTVar $ msgExpired s _activeQueues <- getPeriodStatsData $ activeQueues s _msgSentNtf <- readTVar $ msgSentNtf s _msgRecvNtf <- readTVar $ msgRecvNtf s _activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s _qCount <- readTVar $ qCount s _msgCount <- readTVar $ msgCount s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do @@ -89,6 +93,7 @@ setServerStats s d = do writeTVar (qDeleted s) $! _qDeleted d writeTVar (msgSent s) $! _msgSent d writeTVar (msgRecv s) $! _msgRecv d + writeTVar (msgExpired s) $! _msgExpired d setPeriodStats (activeQueues s) (_activeQueues d) writeTVar (msgSentNtf s) $! _msgSentNtf d writeTVar (msgRecvNtf s) $! _msgRecvNtf d @@ -97,14 +102,16 @@ setServerStats s d = do writeTVar (msgCount s) $! _msgCount d instance StrEncoding ServerStatsData where - strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf} = + strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} = B.unlines [ "fromTime=" <> strEncode _fromTime, "qCreated=" <> strEncode _qCreated, "qSecured=" <> strEncode _qSecured, "qDeleted=" <> strEncode _qDeleted, + "qCount=" <> strEncode _qCount, "msgSent=" <> strEncode _msgSent, "msgRecv=" <> strEncode _msgRecv, + "msgExpired=" <> strEncode _msgExpired, "msgSentNtf=" <> strEncode _msgSentNtf, "msgRecvNtf=" <> strEncode _msgRecvNtf, "activeQueues:", @@ -117,8 +124,10 @@ instance StrEncoding ServerStatsData where _qCreated <- "qCreated=" *> strP <* A.endOfLine _qSecured <- "qSecured=" *> strP <* A.endOfLine _qDeleted <- "qDeleted=" *> strP <* A.endOfLine + _qCount <- "qCount=" *> strP <* A.endOfLine <|> pure 0 _msgSent <- "msgSent=" *> strP <* A.endOfLine _msgRecv <- "msgRecv=" *> strP <* A.endOfLine + _msgExpired <- "msgExpired=" *> strP <* A.endOfLine <|> pure 0 _msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0 _msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0 _activeQueues <- @@ -133,7 +142,7 @@ instance StrEncoding ServerStatsData where optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case Just _ -> strP <* optional A.endOfLine _ -> pure newPeriodStatsData - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount = 0, _msgCount = 0} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0} data PeriodStats a = PeriodStats { day :: TVar (Set a), diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 607c9ce9e..1097833d4 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -662,7 +662,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 16 + logSize testServerStatsBackupFile `shouldReturn` 18 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -680,7 +680,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 16 + logSize testServerStatsBackupFile `shouldReturn` 18 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -699,7 +699,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 16 + logSize testServerStatsBackupFile `shouldReturn` 18 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5 @@ -830,7 +830,7 @@ testRestoreExpireMessages at@(ATransport t) = length (B.lines msgs) `shouldBe` 4 let expCfg1 = Just ExpirationConfig {ttl = 86400, checkInterval = 43200} - cfg1 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg1} + cfg1 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure () logSize testStoreLogFile `shouldReturn` 1 @@ -838,7 +838,7 @@ testRestoreExpireMessages at@(ATransport t) = msgs' `shouldBe` msgs let expCfg2 = Just ExpirationConfig {ttl = 2, checkInterval = 43200} - cfg2 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg2} + cfg2 = cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile} withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure () logSize testStoreLogFile `shouldReturn` 1 @@ -846,6 +846,8 @@ testRestoreExpireMessages at@(ATransport t) = msgs'' <- B.readFile testStoreMsgsFile length (B.lines msgs'') `shouldBe` 2 B.lines msgs'' `shouldBe` drop 2 (B.lines msgs) + Right ServerStatsData {_msgExpired} <- strDecode <$> B.readFile testServerStatsBackupFile + _msgExpired `shouldBe` 2 where runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do