diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index bbdeab340..dfb4973ea 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -229,7 +229,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats + ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -255,6 +255,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do msgSentNtf' <- atomically $ swapTVar msgSentNtf 0 msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0 psNtf <- atomically $ periodStatCounts activeQueuesNtf ts + msgNtfs' <- atomically $ swapTVar (msgNtfs ss) 0 + msgNtfNoSub' <- atomically $ swapTVar (msgNtfNoSub ss) 0 + msgNtfLost' <- atomically $ swapTVar (msgNtfLost ss) 0 pRelays' <- atomically $ getResetProxyStatsData pRelays pRelaysOwn' <- atomically $ getResetProxyStatsData pRelaysOwn pMsgFwds' <- atomically $ getResetProxyStatsData pMsgFwds @@ -296,7 +299,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do show qSubProhibited', show msgSentAuth', show msgSentQuota', - show msgSentLarge' + show msgSentLarge', + show msgNtfs', + show msgNtfNoSub', + show msgNtfLost' ] ) liftIO $ threadDelay' interval @@ -1010,7 +1016,15 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi pure $ err QUOTA Just msg -> time "SEND ok" $ do when (notification msgFlags) $ do - atomically . trySendNotification msg =<< asks random + forM_ (notifier qr) $ \ntf -> do + asks random >>= atomically . trySendNotification ntf msg >>= \case + Nothing -> do + atomically $ modifyTVar' (msgNtfNoSub stats) (+ 1) + logWarn "No notification subscription" + Just False -> do + atomically $ modifyTVar' (msgNtfLost stats) (+ 1) + logWarn "Dropped message notification" + Just True -> atomically $ modifyTVar' (msgNtfs stats) (+ 1) atomically $ modifyTVar' (msgSentNtf stats) (+ 1) atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) atomically $ modifyTVar' (msgSent stats) (+ 1) @@ -1033,18 +1047,19 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi deleted <- atomically $ sum <$> mapM (deleteExpiredMsgs q) old atomically $ modifyTVar' (msgExpired stats) (+ deleted) - trySendNotification :: Message -> TVar ChaChaDRG -> STM () - trySendNotification msg ntfNonceDrg = - forM_ (notifier qr) $ \NtfCreds {notifierId, rcvNtfDhSecret} -> - mapM_ (writeNtf notifierId msg rcvNtfDhSecret ntfNonceDrg) =<< TM.lookup notifierId notifiers + trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool) + trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg = + mapM (writeNtf notifierId msg rcvNtfDhSecret ntfNonceDrg) =<< TM.lookup notifierId notifiers - writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM () + writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM Bool writeNtf nId msg rcvNtfDhSecret ntfNonceDrg Client {sndQ = q} = - unlessM (isFullTBQueue q) $ case msg of - Message {msgId, msgTs} -> do - (nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg - writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)] - _ -> pure () + ifM (isFullTBQueue q) (pure False) (sendNtf $> True) + where + sendNtf = case msg of + Message {msgId, msgTs} -> do + (nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg + writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)] + _ -> pure () mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> TVar ChaChaDRG -> STM (C.CbNonce, EncNMsgMeta) mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg = do diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 880791c3d..f2716c9c3 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -37,9 +37,12 @@ data ServerStats = ServerStats msgRecv :: TVar Int, msgExpired :: TVar Int, activeQueues :: PeriodStats RecipientId, - msgSentNtf :: TVar Int, - msgRecvNtf :: TVar Int, + msgSentNtf :: TVar Int, -- sent messages with NTF flag + msgRecvNtf :: TVar Int, -- received messages with NTF flag activeQueuesNtf :: PeriodStats RecipientId, + msgNtfs :: TVar Int, -- messages notications delivered to NTF server (<= msgSentNtf) + msgNtfNoSub :: TVar Int, -- no subscriber to notifications (e.g., NTF server not connected) + msgNtfLost :: TVar Int, -- notification is lost because NTF delivery queue is full pRelays :: ProxyStats, pRelaysOwn :: ProxyStats, pMsgFwds :: ProxyStats, @@ -70,6 +73,9 @@ data ServerStatsData = ServerStatsData _msgSentNtf :: Int, _msgRecvNtf :: Int, _activeQueuesNtf :: PeriodStatsData RecipientId, + _msgNtfs :: Int, + _msgNtfNoSub :: Int, + _msgNtfLost :: Int, _pRelays :: ProxyStatsData, _pRelaysOwn :: ProxyStatsData, _pMsgFwds :: ProxyStatsData, @@ -102,6 +108,9 @@ newServerStats ts = do msgSentNtf <- newTVar 0 msgRecvNtf <- newTVar 0 activeQueuesNtf <- newPeriodStats + msgNtfs <- newTVar 0 + msgNtfNoSub <- newTVar 0 + msgNtfLost <- newTVar 0 pRelays <- newProxyStats pRelaysOwn <- newProxyStats pMsgFwds <- newProxyStats @@ -109,7 +118,39 @@ newServerStats ts = do pMsgFwdsRecv <- newTVar 0 qCount <- newTVar 0 msgCount <- newTVar 0 - pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount} + pure + ServerStats + { fromTime, + qCreated, + qSecured, + qDeletedAll, + qDeletedNew, + qDeletedSecured, + qSub, + qSubAuth, + qSubDuplicate, + qSubProhibited, + msgSent, + msgSentAuth, + msgSentQuota, + msgSentLarge, + msgRecv, + msgExpired, + activeQueues, + msgSentNtf, + msgRecvNtf, + activeQueuesNtf, + msgNtfs, + msgNtfNoSub, + msgNtfLost, + pRelays, + pRelaysOwn, + pMsgFwds, + pMsgFwdsOwn, + pMsgFwdsRecv, + qCount, + msgCount + } getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -121,7 +162,7 @@ getServerStatsData s = do _qDeletedSecured <- readTVar $ qDeletedSecured s _qSub <- readTVar $ qSub s _qSubAuth <- readTVar $ qSubAuth s - _qSubDuplicate <- readTVar $ qSubDuplicate s + _qSubDuplicate <- readTVar $ qSubDuplicate s _qSubProhibited <- readTVar $ qSubProhibited s _msgSent <- readTVar $ msgSent s _msgSentAuth <- readTVar $ msgSentAuth s @@ -133,6 +174,9 @@ getServerStatsData s = do _msgSentNtf <- readTVar $ msgSentNtf s _msgRecvNtf <- readTVar $ msgRecvNtf s _activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s + _msgNtfs <- readTVar $ msgNtfs s + _msgNtfNoSub <- readTVar $ msgNtfNoSub s + _msgNtfLost <- readTVar $ msgNtfLost s _pRelays <- getProxyStatsData $ pRelays s _pRelaysOwn <- getProxyStatsData $ pRelaysOwn s _pMsgFwds <- getProxyStatsData $ pMsgFwds s @@ -140,7 +184,39 @@ getServerStatsData s = do _pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s _qCount <- readTVar $ qCount s _msgCount <- readTVar $ msgCount s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount} + pure + ServerStatsData + { _fromTime, + _qCreated, + _qSecured, + _qDeletedAll, + _qDeletedNew, + _qDeletedSecured, + _qSub, + _qSubAuth, + _qSubDuplicate, + _qSubProhibited, + _msgSent, + _msgSentAuth, + _msgSentQuota, + _msgSentLarge, + _msgRecv, + _msgExpired, + _activeQueues, + _msgSentNtf, + _msgRecvNtf, + _activeQueuesNtf, + _msgNtfs, + _msgNtfNoSub, + _msgNtfLost, + _pRelays, + _pRelaysOwn, + _pMsgFwds, + _pMsgFwdsOwn, + _pMsgFwdsRecv, + _qCount, + _msgCount + } setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do @@ -151,7 +227,7 @@ setServerStats s d = do writeTVar (qDeletedNew s) $! _qDeletedNew d writeTVar (qDeletedSecured s) $! _qDeletedSecured d writeTVar (qSub s) $! _qSub d - writeTVar (qSubAuth s) $! _qSubAuth d + writeTVar (qSubAuth s) $! _qSubAuth d writeTVar (qSubDuplicate s) $! _qSubDuplicate d writeTVar (qSubProhibited s) $! _qSubProhibited d writeTVar (msgSent s) $! _msgSent d @@ -164,6 +240,9 @@ setServerStats s d = do writeTVar (msgSentNtf s) $! _msgSentNtf d writeTVar (msgRecvNtf s) $! _msgRecvNtf d setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + writeTVar (msgNtfs s) $! _msgNtfs d + writeTVar (msgNtfNoSub s) $! _msgNtfNoSub d + writeTVar (msgNtfLost s) $! _msgNtfLost d setProxyStats (pRelays s) $! _pRelays d setProxyStats (pRelaysOwn s) $! _pRelaysOwn d setProxyStats (pMsgFwds s) $! _pMsgFwds d @@ -194,6 +273,9 @@ instance StrEncoding ServerStatsData where "msgExpired=" <> strEncode (_msgExpired d), "msgSentNtf=" <> strEncode (_msgSentNtf d), "msgRecvNtf=" <> strEncode (_msgRecvNtf d), + "msgNtfs=" <> strEncode (_msgNtfs d), + "msgNtfNoSub=" <> strEncode (_msgNtfNoSub d), + "msgNtfLost=" <> strEncode (_msgNtfLost d), "activeQueues:", strEncode (_activeQueues d), "activeQueuesNtf:", @@ -228,6 +310,9 @@ instance StrEncoding ServerStatsData where _msgExpired <- opt "msgExpired=" _msgSentNtf <- opt "msgSentNtf=" _msgRecvNtf <- opt "msgRecvNtf=" + _msgNtfs <- opt "msgNtfs=" + _msgNtfNoSub <- opt "msgNtfNoSub=" + _msgNtfLost <- opt "msgNtfLost=" _activeQueues <- optional ("activeQueues:" <* A.endOfLine) >>= \case Just _ -> strP <* optional A.endOfLine @@ -245,7 +330,39 @@ instance StrEncoding ServerStatsData where _pMsgFwds <- proxyStatsP "pMsgFwds:" _pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:" _pMsgFwdsRecv <- opt "pMsgFwdsRecv=" - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0} + pure + ServerStatsData + { _fromTime, + _qCreated, + _qSecured, + _qDeletedAll, + _qDeletedNew, + _qDeletedSecured, + _qSub, + _qSubAuth, + _qSubDuplicate, + _qSubProhibited, + _msgSent, + _msgSentAuth, + _msgSentQuota, + _msgSentLarge, + _msgRecv, + _msgExpired, + _msgSentNtf, + _msgRecvNtf, + _msgNtfs, + _msgNtfNoSub, + _msgNtfLost, + _activeQueues, + _activeQueuesNtf, + _pRelays, + _pRelaysOwn, + _pMsgFwds, + _pMsgFwdsOwn, + _pMsgFwdsRecv, + _qCount, + _msgCount = 0 + } where opt s = A.string s *> strP <* A.endOfLine <|> pure 0 proxyStatsP key = diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index e2ca278d9..a124a42e4 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 52 + logSize testServerStatsBackupFile `shouldReturn` 55 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 52 + logSize testServerStatsBackupFile `shouldReturn` 55 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 52 + logSize testServerStatsBackupFile `shouldReturn` 55 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5