diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index adf0b5df7..0d209229c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -228,7 +228,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -240,7 +240,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do qDeletedAll' <- atomically $ swapTVar qDeletedAll 0 qDeletedNew' <- atomically $ swapTVar qDeletedNew 0 qDeletedSecured' <- atomically $ swapTVar qDeletedSecured 0 + qSub' <- atomically $ swapTVar qSub 0 + qSubAuth' <- atomically $ swapTVar qSubAuth 0 + qSubDuplicate' <- atomically $ swapTVar qSubDuplicate 0 + qSubProhibited' <- atomically $ swapTVar qSubProhibited 0 msgSent' <- atomically $ swapTVar msgSent 0 + msgSentAuth' <- atomically $ swapTVar msgSentAuth 0 + msgSentQuota' <- atomically $ swapTVar msgSentQuota 0 + msgSentLarge' <- atomically $ swapTVar msgSentLarge 0 msgRecv' <- atomically $ swapTVar msgRecv 0 msgExpired' <- atomically $ swapTVar msgExpired 0 ps <- atomically $ periodStatCounts activeQueues ts @@ -281,7 +288,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do <> showProxyStats pRelaysOwn' <> showProxyStats pMsgFwds' <> showProxyStats pMsgFwdsOwn' - <> [show pMsgFwdsRecv'] + <> [ show pMsgFwdsRecv', + show qSub', + show qSubAuth', + show qSubDuplicate', + show qSubProhibited', + show msgSentAuth', + show msgSentQuota', + show msgSentLarge' + ] ) liftIO $ threadDelay' interval where @@ -504,19 +519,25 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv forever $ do ts <- L.toList <$> liftIO (tGet h) atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime - (errs, cmds) <- partitionEithers <$> mapM cmdAction ts + stats <- asks serverStats + (errs, cmds) <- partitionEithers <$> mapM (cmdAction stats) ts write sndQ errs write rcvQ cmds where - cmdAction :: SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) - cmdAction (tAuth, authorized, (corrId, entId, cmdOrError)) = + cmdAction :: ServerStats -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) + cmdAction stats (tAuth, authorized, (corrId, entId, cmdOrError)) = case cmdOrError of Left e -> pure $ Left (corrId, entId, ERR e) - Right cmd -> verified <$> verifyTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized entId cmd + Right cmd -> verified =<< verifyTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized entId cmd where verified = \case - VRVerified qr -> Right (qr, (corrId, entId, cmd)) - VRFailed -> Left (corrId, entId, ERR AUTH) + VRVerified qr -> pure $ Right (qr, (corrId, entId, cmd)) + VRFailed -> do + case cmd of + Cmd _ SEND {} -> atomically $ modifyTVar' (msgSentAuth stats) (+ 1) + Cmd _ SUB -> atomically $ modifyTVar' (qSubAuth stats) (+ 1) + _ -> pure () + pure $ Left (corrId, entId, ERR AUTH) write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () @@ -856,15 +877,19 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi subscribeQueue :: QueueRec -> RecipientId -> M (Transmission BrokerMsg) subscribeQueue qr rId = do + stats <- asks serverStats atomically (TM.lookup rId subscriptions) >>= \case - Nothing -> + Nothing -> do + atomically $ modifyTVar' (qSub stats) (+ 1) newSub >>= deliver Just sub -> readTVarIO sub >>= \case - Sub {subThread = ProhibitSub} -> + Sub {subThread = ProhibitSub} -> do -- cannot use SUB in the same connection where GET was used + atomically $ modifyTVar' (qSubProhibited stats) (+ 1) pure (corrId, rId, ERR $ CMD PROHIBITED) - s -> + s -> do + atomically $ modifyTVar' (qSubDuplicate stats) (+ 1) atomically (tryTakeTMVar $ delivered s) >> deliver sub where newSub :: M (TVar Sub) @@ -958,29 +983,37 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg) sendMessage qr msgFlags msgBody - | B.length msgBody > maxMessageLength thVersion = pure $ err LARGE_MSG - | otherwise = case status qr of - QueueOff -> return $ err AUTH - QueueActive -> - case C.maxLenBS msgBody of - Left _ -> pure $ err LARGE_MSG - Right body -> do - msg_ <- time "SEND" $ do - q <- getStoreMsgQueue "SEND" $ recipientId qr - expireMessages q - atomically . writeMsg q =<< mkMessage body - case msg_ of - Nothing -> pure $ err QUOTA - Just msg -> time "SEND ok" $ do - stats <- asks serverStats - when (notification msgFlags) $ do - atomically . trySendNotification msg =<< asks random - atomically $ modifyTVar' (msgSentNtf stats) (+ 1) - atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) - atomically $ modifyTVar' (msgSent stats) (+ 1) - atomically $ modifyTVar' (msgCount stats) (+ 1) - atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) - pure ok + | B.length msgBody > maxMessageLength thVersion = do + stats <- asks serverStats + atomically $ modifyTVar' (msgSentLarge stats) (+ 1) + pure $ err LARGE_MSG + | otherwise = do + stats <- asks serverStats + case status qr of + QueueOff -> do + atomically $ modifyTVar' (msgSentAuth stats) (+ 1) + pure $ err AUTH + QueueActive -> + case C.maxLenBS msgBody of + Left _ -> pure $ err LARGE_MSG + Right body -> do + msg_ <- time "SEND" $ do + q <- getStoreMsgQueue "SEND" $ recipientId qr + expireMessages q + atomically . writeMsg q =<< mkMessage body + case msg_ of + Nothing -> do + atomically $ modifyTVar' (msgSentQuota stats) (+ 1) + pure $ err QUOTA + Just msg -> time "SEND ok" $ do + when (notification msgFlags) $ do + atomically . trySendNotification msg =<< asks random + atomically $ modifyTVar' (msgSentNtf stats) (+ 1) + atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) + atomically $ modifyTVar' (msgSent stats) (+ 1) + atomically $ modifyTVar' (msgCount stats) (+ 1) + atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) + pure ok where THandleParams {thVersion} = thParams' mkMessage :: C.MaxLenBS MaxMessageLen -> M Message diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index d8935b44b..880791c3d 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -26,7 +26,14 @@ data ServerStats = ServerStats qDeletedAll :: TVar Int, qDeletedNew :: TVar Int, qDeletedSecured :: TVar Int, + qSub :: TVar Int, + qSubAuth :: TVar Int, + qSubDuplicate :: TVar Int, + qSubProhibited :: TVar Int, msgSent :: TVar Int, + msgSentAuth :: TVar Int, + msgSentQuota :: TVar Int, + msgSentLarge :: TVar Int, msgRecv :: TVar Int, msgExpired :: TVar Int, activeQueues :: PeriodStats RecipientId, @@ -49,7 +56,14 @@ data ServerStatsData = ServerStatsData _qDeletedAll :: Int, _qDeletedNew :: Int, _qDeletedSecured :: Int, + _qSub :: Int, + _qSubAuth :: Int, + _qSubDuplicate :: Int, + _qSubProhibited :: Int, _msgSent :: Int, + _msgSentAuth :: Int, + _msgSentQuota :: Int, + _msgSentLarge :: Int, _msgRecv :: Int, _msgExpired :: Int, _activeQueues :: PeriodStatsData RecipientId, @@ -74,7 +88,14 @@ newServerStats ts = do qDeletedAll <- newTVar 0 qDeletedNew <- newTVar 0 qDeletedSecured <- newTVar 0 + qSub <- newTVar 0 + qSubAuth <- newTVar 0 + qSubDuplicate <- newTVar 0 + qSubProhibited <- newTVar 0 msgSent <- newTVar 0 + msgSentAuth <- newTVar 0 + msgSentQuota <- newTVar 0 + msgSentLarge <- newTVar 0 msgRecv <- newTVar 0 msgExpired <- newTVar 0 activeQueues <- newPeriodStats @@ -88,7 +109,7 @@ newServerStats ts = do pMsgFwdsRecv <- newTVar 0 qCount <- newTVar 0 msgCount <- newTVar 0 - pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount} + pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -98,7 +119,14 @@ getServerStatsData s = do _qDeletedAll <- readTVar $ qDeletedAll s _qDeletedNew <- readTVar $ qDeletedNew s _qDeletedSecured <- readTVar $ qDeletedSecured s + _qSub <- readTVar $ qSub s + _qSubAuth <- readTVar $ qSubAuth s + _qSubDuplicate <- readTVar $ qSubDuplicate s + _qSubProhibited <- readTVar $ qSubProhibited s _msgSent <- readTVar $ msgSent s + _msgSentAuth <- readTVar $ msgSentAuth s + _msgSentQuota <- readTVar $ msgSentQuota s + _msgSentLarge <- readTVar $ msgSentLarge s _msgRecv <- readTVar $ msgRecv s _msgExpired <- readTVar $ msgExpired s _activeQueues <- getPeriodStatsData $ activeQueues s @@ -112,7 +140,7 @@ getServerStatsData s = do _pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s _qCount <- readTVar $ qCount s _msgCount <- readTVar $ msgCount s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount} setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do @@ -122,7 +150,14 @@ setServerStats s d = do writeTVar (qDeletedAll s) $! _qDeletedAll d writeTVar (qDeletedNew s) $! _qDeletedNew d writeTVar (qDeletedSecured s) $! _qDeletedSecured d + writeTVar (qSub s) $! _qSub d + writeTVar (qSubAuth s) $! _qSubAuth d + writeTVar (qSubDuplicate s) $! _qSubDuplicate d + writeTVar (qSubProhibited s) $! _qSubProhibited d writeTVar (msgSent s) $! _msgSent d + writeTVar (msgSentAuth s) $! _msgSentAuth d + writeTVar (msgSentQuota s) $! _msgSentQuota d + writeTVar (msgSentLarge s) $! _msgSentLarge d writeTVar (msgRecv s) $! _msgRecv d writeTVar (msgExpired s) $! _msgExpired d setPeriodStats (activeQueues s) (_activeQueues d) @@ -147,7 +182,14 @@ instance StrEncoding ServerStatsData where "qDeletedNew=" <> strEncode (_qDeletedNew d), "qDeletedSecured=" <> strEncode (_qDeletedSecured d), "qCount=" <> strEncode (_qCount d), + "qSub=" <> strEncode (_qSub d), + "qSubAuth=" <> strEncode (_qSubAuth d), + "qSubDuplicate=" <> strEncode (_qSubDuplicate d), + "qSubProhibited=" <> strEncode (_qSubProhibited d), "msgSent=" <> strEncode (_msgSent d), + "msgSentAuth=" <> strEncode (_msgSentAuth d), + "msgSentQuota=" <> strEncode (_msgSentQuota d), + "msgSentLarge=" <> strEncode (_msgSentLarge d), "msgRecv=" <> strEncode (_msgRecv d), "msgExpired=" <> strEncode (_msgExpired d), "msgSentNtf=" <> strEncode (_msgSentNtf d), @@ -173,12 +215,19 @@ instance StrEncoding ServerStatsData where (_qDeletedAll, _qDeletedNew, _qDeletedSecured) <- (,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine) <|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine)) - _qCount <- "qCount=" *> strP <* A.endOfLine <|> pure 0 + _qCount <- opt "qCount=" + _qSub <- opt "qSub=" + _qSubAuth <- opt "qSubAuth=" + _qSubDuplicate <- opt "qSubDuplicate=" + _qSubProhibited <- opt "qSubProhibited=" _msgSent <- "msgSent=" *> strP <* A.endOfLine + _msgSentAuth <- opt "msgSentAuth=" + _msgSentQuota <- opt "msgSentQuota=" + _msgSentLarge <- opt "msgSentLarge=" _msgRecv <- "msgRecv=" *> strP <* A.endOfLine - _msgExpired <- "msgExpired=" *> strP <* A.endOfLine <|> pure 0 - _msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0 - _msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0 + _msgExpired <- opt "msgExpired=" + _msgSentNtf <- opt "msgSentNtf=" + _msgRecvNtf <- opt "msgRecvNtf=" _activeQueues <- optional ("activeQueues:" <* A.endOfLine) >>= \case Just _ -> strP <* optional A.endOfLine @@ -195,9 +244,10 @@ instance StrEncoding ServerStatsData where _pRelaysOwn <- proxyStatsP "pRelaysOwn:" _pMsgFwds <- proxyStatsP "pMsgFwds:" _pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:" - _pMsgFwdsRecv <- "pMsgFwdsRecv=" *> strP <* A.endOfLine <|> pure 0 - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0} + _pMsgFwdsRecv <- opt "pMsgFwdsRecv=" + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0} where + opt s = A.string s *> strP <* A.endOfLine <|> pure 0 proxyStatsP key = optional (A.string key >> A.endOfLine) >>= \case Just _ -> strP <* optional A.endOfLine diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index b0c90ca96..e2ca278d9 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 45 + logSize testServerStatsBackupFile `shouldReturn` 52 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 45 + logSize testServerStatsBackupFile `shouldReturn` 52 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 45 + logSize testServerStatsBackupFile `shouldReturn` 52 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5