From b76ef03dbef3beea32c98b27e692d45a32bc8f9b Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 1 Aug 2022 08:42:23 +0100 Subject: [PATCH] ntf: server stats (#487) * nts: server stats * ntf: collect stats, refactor * rename property * fixes --- apps/ntf-server/Main.hs | 51 +++++--- apps/smp-server/Main.hs | 1 - simplexmq.cabal | 1 + src/Simplex/Messaging/Notifications/Server.hs | 115 +++++++++++++++- .../Messaging/Notifications/Server/Env.hs | 15 ++- .../Messaging/Notifications/Server/Stats.hs | 113 ++++++++++++++++ src/Simplex/Messaging/Server.hs | 36 ++--- src/Simplex/Messaging/Server/Stats.hs | 123 ++++++++++++++---- tests/NtfClient.hs | 7 +- 9 files changed, 376 insertions(+), 86 deletions(-) create mode 100644 src/Simplex/Messaging/Notifications/Server/Stats.hs diff --git a/apps/ntf-server/Main.hs b/apps/ntf-server/Main.hs index eec86bd17..8789f6b40 100644 --- a/apps/ntf-server/Main.hs +++ b/apps/ntf-server/Main.hs @@ -1,9 +1,12 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} module Main where import Control.Logger.Simple +import Data.Functor (($>)) +import Data.Ini (lookupValue) import Simplex.Messaging.Client.Agent (defaultSMPClientAgentConfig) import Simplex.Messaging.Notifications.Server (runNtfServer) import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..)) @@ -50,28 +53,36 @@ ntfServerCLIConfig = \# This option enables saving memory to append only log,\n\ \# and restoring it when the server is started.\n\ \# Log is compacted on start (deleted objects are removed).\n\ - \# The messages are not logged.\n" - <> ("enable: " <> (if enableStoreLog then "on" else "off") <> "\n\n") - <> "[TRANSPORT]\n\ + \enable: " + <> (if enableStoreLog then "on" else "off") + <> "\n\ + \log_stats: off\n\n\ + \[TRANSPORT]\n\ \port: " <> defaultServerPort <> "\n\ \websockets: off\n", - mkServerConfig = \storeLogFile transports _ -> - NtfServerConfig - { transports, - subIdBytes = 24, - regCodeBytes = 32, - clientQSize = 16, - subQSize = 64, - pushQSize = 128, - smpAgentCfg = defaultSMPClientAgentConfig, - apnsConfig = defaultAPNSPushClientConfig, - inactiveClientExpiration = Nothing, - storeLogFile, - resubscribeDelay = 50000, -- 50ms - caCertificateFile = caCrtFile, - privateKeyFile = serverKeyFile, - certificateFile = serverCrtFile - } + mkServerConfig = \storeLogFile transports ini -> + let settingIsOn section name = if lookupValue section name ini == Right "on" then Just () else Nothing + logStats = settingIsOn "STORE_LOG" "log_stats" + in NtfServerConfig + { transports, + subIdBytes = 24, + regCodeBytes = 32, + clientQSize = 16, + subQSize = 64, + pushQSize = 128, + smpAgentCfg = defaultSMPClientAgentConfig, + apnsConfig = defaultAPNSPushClientConfig, + inactiveClientExpiration = Nothing, + storeLogFile, + resubscribeDelay = 50000, -- 50ms + caCertificateFile = caCrtFile, + privateKeyFile = serverKeyFile, + certificateFile = serverCrtFile, + logStatsInterval = logStats $> 86400, -- seconds + logStatsStartTime = 0, -- seconds from 00:00 UTC + serverStatsLogFile = combine logPath "ntf-server-stats.daily.log", + serverStatsBackupFile = logStats $> combine logPath "ntf-server-stats.log" + } } diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index 94033adf9..7e7438867 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -2,7 +2,6 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TypeApplications #-} module Main where diff --git a/simplexmq.cabal b/simplexmq.cabal index a2f633e31..48451b532 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -61,6 +61,7 @@ library Simplex.Messaging.Notifications.Server Simplex.Messaging.Notifications.Server.Env Simplex.Messaging.Notifications.Server.Push.APNS + Simplex.Messaging.Notifications.Server.Stats Simplex.Messaging.Notifications.Server.Store Simplex.Messaging.Notifications.Server.StoreLog Simplex.Messaging.Notifications.Transport diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index b1ed82a8b..af404c732 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -4,8 +4,10 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} @@ -18,30 +20,40 @@ import Control.Monad.IO.Unlift (MonadUnliftIO) import Control.Monad.Reader import Crypto.Random (MonadRandom) import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) +import Data.List (intercalate) import Data.Map.Strict (Map) import qualified Data.Text as T +import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (getSystemTime) +import Data.Time.Format.ISO8601 (iso8601Show) import Network.Socket (ServiceName) import Simplex.Messaging.Client (ProtocolClientError (..)) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Env import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..), PushNotification (..), PushProviderError (..)) +import Simplex.Messaging.Notifications.Server.Stats import Simplex.Messaging.Notifications.Server.Store import Simplex.Messaging.Notifications.Server.StoreLog import Simplex.Messaging.Notifications.Transport import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server +import Simplex.Messaging.Server.Stats import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport (..), THandle (..), TProxy, Transport (..)) import Simplex.Messaging.Transport.Server (runTransportServer) import Simplex.Messaging.Util +import System.Exit (exitFailure) +import System.IO (BufferMode (..), hPutStrLn, hSetBuffering) import System.Mem.Weak (deRefWeak) -import UnliftIO (IOMode (..), async, uninterruptibleCancel) +import UnliftIO (IOMode (..), async, uninterruptibleCancel, withFile) import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId, threadDelay) +import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.STM @@ -54,12 +66,13 @@ runNtfServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> NtfSer runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg ntfServer :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerConfig -> TMVar Bool -> m () -ntfServer NtfServerConfig {transports} started = do +ntfServer cfg@NtfServerConfig {transports} started = do + restoreServerStats s <- asks subscriber ps <- asks pushServer subs <- readTVarIO =<< asks (subscriptions . store) void . forkIO $ resubscribe s subs - raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) `finally` stopServer + raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports <> serverStatsThread_ cfg) `finally` stopServer where runServer :: (ServiceName, ATransport) -> m () runServer (tcpPort, ATransport t) = do @@ -76,8 +89,55 @@ ntfServer NtfServerConfig {transports} started = do stopServer :: m () stopServer = do withNtfLog closeStoreLog + saveServerStats asks (smpSubscribers . subscriber) >>= readTVarIO >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (liftIO . deRefWeak >=> mapM_ killThread)) + serverStatsThread_ :: NtfServerConfig -> [m ()] + serverStatsThread_ NtfServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = + [logServerStats logStatsStartTime interval serverStatsLogFile] + serverStatsThread_ _ = [] + + logServerStats :: Int -> Int -> FilePath -> m () + logServerStats startAt logInterval statsFilePath = do + initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime + liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath + threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) + NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, subCreated, subDeleted, ntfReceived, ntfDelivered, activeTokens, activeSubs} <- asks serverStats + let interval = 1000000 * logInterval + withFile statsFilePath AppendMode $ \h -> liftIO $ do + hSetBuffering h LineBuffering + forever $ do + ts <- getCurrentTime + fromTime' <- atomically $ swapTVar fromTime ts + tknCreated' <- atomically $ swapTVar tknCreated 0 + tknVerified' <- atomically $ swapTVar tknVerified 0 + tknDeleted' <- atomically $ swapTVar tknDeleted 0 + subCreated' <- atomically $ swapTVar subCreated 0 + subDeleted' <- atomically $ swapTVar subDeleted 0 + ntfReceived' <- atomically $ swapTVar ntfReceived 0 + ntfDelivered' <- atomically $ swapTVar ntfDelivered 0 + tkn <- atomically $ periodStatCounts activeTokens ts + sub <- atomically $ periodStatCounts activeSubs ts + hPutStrLn h $ + intercalate + "," + [ iso8601Show $ utctDay fromTime', + show tknCreated', + show tknVerified', + show tknDeleted', + show subCreated', + show subDeleted', + show ntfReceived', + show ntfDelivered', + dayCount tkn, + weekCount tkn, + monthCount tkn, + dayCount sub, + weekCount sub, + monthCount sub + ] + threadDelay interval + resubscribe :: (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> m () resubscribe NtfSubscriber {newSubQ} subs = do d <- asks $ resubscribeDelay . config @@ -137,9 +197,12 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge ntfTs <- liftIO getSystemTime st <- asks store NtfPushServer {pushQ} <- asks pushServer + stats <- asks serverStats + atomically $ updatePeriodStats (activeSubs stats) ntfId atomically $ findNtfSubscriptionToken st smpQueue >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta})) + incNtfStat ntfReceived SMP.END -> updateSubStatus smpQueue NSEnd _ -> pure () pure () @@ -193,18 +256,21 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) status <- readTVarIO tknStatus case (status, ntf) of - (_, PNVerification _) -> do + (_, PNVerification _) -> -- TODO check token status deliverNotification pp tkn ntf >>= \case Right _ -> do status_ <- atomically $ stateTVar tknStatus $ \status' -> if status' == NTActive then (Nothing, NTActive) else (Just NTConfirmed, NTConfirmed) forM_ status_ $ \status' -> withNtfLog $ \sl -> logTokenStatus sl ntfTknId status' _ -> pure () - (NTActive, PNCheckMessages) -> do + (NTActive, PNCheckMessages) -> void $ deliverNotification pp tkn ntf (NTActive, PNMessage {}) -> do + stats <- asks serverStats + atomically $ updatePeriodStats (activeTokens stats) ntfTknId void $ deliverNotification pp tkn ntf - _ -> do + incNtfStat ntfDelivered + _ -> liftIO $ logError "bad notification token status" where deliverNotification :: PushProvider -> NtfTknData -> PushNotification -> m (Either PushProviderError ()) @@ -347,6 +413,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu atomically $ addNtfToken st tknId tkn atomically $ writeTBQueue pushQ (tkn, PNVerification regCode) withNtfLog (`logCreateToken` tkn) + incNtfStat tknCreated pure (corrId, "", NRTknId tknId srvDhPubKey) NtfReqCmd SToken (NtfTkn tkn@NtfTknData {ntfTknId, tknStatus, tknRegCode, tknDhSecret, tknDhKeys = (srvDhPubKey, srvDhPrivKey), tknCronInterval}) (corrId, tknId, cmd) -> do status <- readTVarIO tknStatus @@ -368,6 +435,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu tIds <- atomically $ removeInactiveTokenRegistrations st tkn forM_ tIds cancelInvervalNotifications withNtfLog $ \s -> logTokenStatus s tknId NTActive + incNtfStat tknVerified pure NROk | otherwise -> do logDebug "TVFY - incorrect code or token status" @@ -386,6 +454,8 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu addNtfToken st tknId tkn' writeTBQueue pushQ (tkn', PNVerification regCode) withNtfLog $ \s -> logUpdateToken s tknId token' regCode + incNtfStat tknDeleted + incNtfStat tknCreated pure NROk TDEL -> do logDebug "TDEL" @@ -395,6 +465,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) cancelInvervalNotifications tknId withNtfLog (`logDeleteToken` tknId) + incNtfStat tknDeleted pure NROk TCRN 0 -> do logDebug "TCRN 0" @@ -434,6 +505,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu Just _ -> atomically (writeTBQueue newSubQ $ NtfSub sub) $> NRSubId subId _ -> pure $ NRErr AUTH withNtfLog (`logCreateSubscription` sub) + incNtfStat subCreated pure (corrId, "", resp) NtfReqCmd SSubscription (NtfSub NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, notifierKey = registeredNKey, subStatus}) (corrId, subId, cmd) -> do status <- readTVarIO subStatus @@ -454,6 +526,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu atomically $ deleteNtfSubscription st subId atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) withNtfLog (`logDeleteSubscription` subId) + incNtfStat subDeleted pure NROk PING -> pure NRPong getId :: m NtfEntityId @@ -471,3 +544,33 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu withNtfLog :: (MonadUnliftIO m, MonadReader NtfEnv m) => (StoreLog 'WriteMode -> IO a) -> m () withNtfLog action = liftIO . mapM_ action =<< asks storeLog + +incNtfStat :: (MonadUnliftIO m, MonadReader NtfEnv m) => (NtfServerStats -> TVar Int) -> m () +incNtfStat statSel = do + stats <- asks serverStats + atomically $ modifyTVar (statSel stats) (+ 1) + +saveServerStats :: (MonadUnliftIO m, MonadReader NtfEnv m) => m () +saveServerStats = + asks (serverStatsBackupFile . config) + >>= mapM_ (\f -> asks serverStats >>= atomically . getNtfServerStatsData >>= liftIO . saveStats f) + where + saveStats f stats = do + logInfo $ "saving server stats to file " <> T.pack f + B.writeFile f $ strEncode stats + logInfo "server stats saved" + +restoreServerStats :: (MonadUnliftIO m, MonadReader NtfEnv m) => m () +restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats + where + restoreStats f = whenM (doesFileExist f) $ do + logInfo $ "restoring server stats from file " <> T.pack f + liftIO (strDecode <$> B.readFile f) >>= \case + Right d -> do + s <- asks serverStats + atomically $ setNtfServerStats s d + renameFile f $ f <> ".bak" + logInfo "server stats restored" + Left e -> do + logInfo $ "error restoring server stats: " <> T.pack e + liftIO exitFailure diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index c60239b32..95f7c45ae 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -11,6 +11,7 @@ import Control.Concurrent.Async (Async) import Control.Monad.IO.Unlift import Crypto.Random import Data.ByteString.Char8 (ByteString) +import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.System (SystemTime) import Data.Word (Word16) import Data.X509.Validation (Fingerprint (..)) @@ -21,6 +22,7 @@ import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Push.APNS +import Simplex.Messaging.Notifications.Server.Stats import Simplex.Messaging.Notifications.Server.Store import Simplex.Messaging.Notifications.Server.StoreLog import Simplex.Messaging.Protocol (CorrId, SMPServer, Transmission) @@ -48,7 +50,12 @@ data NtfServerConfig = NtfServerConfig -- CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, - certificateFile :: FilePath + certificateFile :: FilePath, + -- stats config - see SMP server config + logStatsInterval :: Maybe Int, + logStatsStartTime :: Int, + serverStatsLogFile :: FilePath, + serverStatsBackupFile :: Maybe FilePath } defaultInactiveClientExpiration :: ExpirationConfig @@ -67,7 +74,8 @@ data NtfEnv = NtfEnv idsDrg :: TVar ChaChaDRG, serverIdentity :: C.KeyHash, tlsServerParams :: T.ServerParams, - serverIdentity :: C.KeyHash + serverIdentity :: C.KeyHash, + serverStats :: NtfServerStats } newNtfServerEnv :: (MonadUnliftIO m, MonadRandom m) => NtfServerConfig -> m NtfEnv @@ -79,7 +87,8 @@ newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsCo pushServer <- atomically $ newNtfPushServer pushQSize apnsConfig tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile - pure NtfEnv {config, subscriber, pushServer, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp} + serverStats <- atomically . newNtfServerStats =<< liftIO getCurrentTime + pure NtfEnv {config, subscriber, pushServer, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp, serverStats} data NtfSubscriber = NtfSubscriber { smpSubscribers :: TMap SMPServer SMPSubscriber, diff --git a/src/Simplex/Messaging/Notifications/Server/Stats.hs b/src/Simplex/Messaging/Notifications/Server/Stats.hs new file mode 100644 index 000000000..6af4b0611 --- /dev/null +++ b/src/Simplex/Messaging/Notifications/Server/Stats.hs @@ -0,0 +1,113 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.Messaging.Notifications.Server.Stats where + +import Control.Applicative (optional) +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Char8 as B +import Data.Time.Clock (UTCTime) +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Notifications.Protocol (NtfTokenId) +import Simplex.Messaging.Protocol (NotifierId) +import Simplex.Messaging.Server.Stats +import UnliftIO.STM + +data NtfServerStats = NtfServerStats + { fromTime :: TVar UTCTime, + tknCreated :: TVar Int, + tknVerified :: TVar Int, + tknDeleted :: TVar Int, + subCreated :: TVar Int, + subDeleted :: TVar Int, + ntfReceived :: TVar Int, + ntfDelivered :: TVar Int, + activeTokens :: PeriodStats NtfTokenId, + activeSubs :: PeriodStats NotifierId + } + +data NtfServerStatsData = NtfServerStatsData + { _fromTime :: UTCTime, + _tknCreated :: Int, + _tknVerified :: Int, + _tknDeleted :: Int, + _subCreated :: Int, + _subDeleted :: Int, + _ntfReceived :: Int, + _ntfDelivered :: Int, + _activeTokens :: PeriodStatsData NtfTokenId, + _activeSubs :: PeriodStatsData NotifierId + } + +newNtfServerStats :: UTCTime -> STM NtfServerStats +newNtfServerStats ts = do + fromTime <- newTVar ts + tknCreated <- newTVar 0 + tknVerified <- newTVar 0 + tknDeleted <- newTVar 0 + subCreated <- newTVar 0 + subDeleted <- newTVar 0 + ntfReceived <- newTVar 0 + ntfDelivered <- newTVar 0 + activeTokens <- newPeriodStats + activeSubs <- newPeriodStats + pure NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, subCreated, subDeleted, ntfReceived, ntfDelivered, activeTokens, activeSubs} + +getNtfServerStatsData :: NtfServerStats -> STM NtfServerStatsData +getNtfServerStatsData s = do + _fromTime <- readTVar $ fromTime (s :: NtfServerStats) + _tknCreated <- readTVar $ tknCreated s + _tknVerified <- readTVar $ tknVerified s + _tknDeleted <- readTVar $ tknDeleted s + _subCreated <- readTVar $ subCreated s + _subDeleted <- readTVar $ subDeleted s + _ntfReceived <- readTVar $ ntfReceived s + _ntfDelivered <- readTVar $ ntfDelivered s + _activeTokens <- getPeriodStatsData $ activeTokens s + _activeSubs <- getPeriodStatsData $ activeSubs s + pure NtfServerStatsData {_fromTime, _tknCreated, _tknVerified, _tknDeleted, _subCreated, _subDeleted, _ntfReceived, _ntfDelivered, _activeTokens, _activeSubs} + +setNtfServerStats :: NtfServerStats -> NtfServerStatsData -> STM () +setNtfServerStats s d = do + writeTVar (fromTime (s :: NtfServerStats)) (_fromTime (d :: NtfServerStatsData)) + writeTVar (tknCreated s) (_tknCreated d) + writeTVar (tknVerified s) (_tknVerified d) + writeTVar (tknDeleted s) (_tknDeleted d) + writeTVar (subCreated s) (_subCreated d) + writeTVar (subDeleted s) (_subDeleted d) + writeTVar (ntfReceived s) (_ntfReceived d) + writeTVar (ntfDelivered s) (_ntfDelivered d) + setPeriodStats (activeTokens s) (_activeTokens d) + setPeriodStats (activeSubs s) (_activeSubs d) + +instance StrEncoding NtfServerStatsData where + strEncode NtfServerStatsData {_fromTime, _tknCreated, _tknVerified, _tknDeleted, _subCreated, _subDeleted, _ntfReceived, _ntfDelivered, _activeTokens, _activeSubs} = + B.unlines + [ "fromTime=" <> strEncode _fromTime, + "tknCreated=" <> strEncode _tknCreated, + "tknVerified=" <> strEncode _tknVerified, + "tknDeleted=" <> strEncode _tknDeleted, + "subCreated=" <> strEncode _subCreated, + "subDeleted=" <> strEncode _subDeleted, + "ntfReceived=" <> strEncode _ntfReceived, + "ntfDelivered=" <> strEncode _ntfDelivered, + "activeTokens:", + strEncode _activeTokens, + "activeSubs:", + strEncode _activeSubs + ] + strP = do + _fromTime <- "fromTime=" *> strP <* A.endOfLine + _tknCreated <- "tknCreated=" *> strP <* A.endOfLine + _tknVerified <- "tknVerified=" *> strP <* A.endOfLine + _tknDeleted <- "tknDeleted=" *> strP <* A.endOfLine + _subCreated <- "subCreated=" *> strP <* A.endOfLine + _subDeleted <- "subDeleted=" *> strP <* A.endOfLine + _ntfReceived <- "ntfReceived=" *> strP <* A.endOfLine + _ntfDelivered <- "ntfDelivered=" *> strP <* A.endOfLine + _ <- "activeTokens:" <* A.endOfLine + _activeTokens <- strP <* A.endOfLine + _ <- "activeSubs:" <* A.endOfLine + _activeSubs <- strP <* optional A.endOfLine + pure NtfServerStatsData {_fromTime, _tknCreated, _tknVerified, _tknDeleted, _subCreated, _subDeleted, _ntfReceived, _ntfDelivered, _activeTokens, _activeSubs} diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 347476253..71e49efab 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -50,12 +50,8 @@ import Data.List (intercalate) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (isNothing) -import Data.Set (Set) -import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) -import Data.Time.Calendar.Month.Compat (pattern MonthDay) -import Data.Time.Calendar.OrdinalDate (mondayStartWeek) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) @@ -177,7 +173,7 @@ smpServer started = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} <- asks serverStats let interval = 1000000 * logInterval withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering @@ -189,17 +185,9 @@ smpServer started = do qDeleted' <- atomically $ swapTVar qDeleted 0 msgSent' <- atomically $ swapTVar msgSent 0 msgRecv' <- atomically $ swapTVar msgRecv 0 - let day = utctDay ts - (_, wDay) = mondayStartWeek day - MonthDay _ mDay = day - (dayMsgQueues', weekMsgQueues', monthMsgQueues') <- - atomically $ (,,) <$> periodCount 1 dayMsgQueues <*> periodCount wDay weekMsgQueues <*> periodCount mDay monthMsgQueues - hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayMsgQueues', weekMsgQueues', monthMsgQueues'] + ps <- atomically $ periodStatCounts activeQueues ts + hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayCount ps, weekCount ps, monthCount ps] threadDelay interval - where - periodCount :: Int -> TVar (Set RecipientId) -> STM String - periodCount 1 pVar = show . S.size <$> swapTVar pVar S.empty - periodCount _ _ = pure "" runClient :: Transport c => TProxy c -> c -> m () runClient _ h = do @@ -538,15 +526,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv updateStats = do stats <- asks serverStats atomically $ modifyTVar (msgRecv stats) (+ 1) - atomically $ updateActiveQueues stats queueId - - updateActiveQueues :: ServerStats -> RecipientId -> STM () - updateActiveQueues stats qId = do - updatePeriod dayMsgQueues - updatePeriod weekMsgQueues - updatePeriod monthMsgQueues - where - updatePeriod pSel = modifyTVar (pSel stats) (S.insert qId) + atomically $ updatePeriodStats (activeQueues stats) queueId sendMessage :: QueueRec -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg) sendMessage qr msgFlags msgBody @@ -571,7 +551,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv when (sent == OK) $ do stats <- asks serverStats atomically $ modifyTVar (msgSent stats) (+ 1) - atomically $ updateActiveQueues stats $ recipientId qr + atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) pure resp where mkMessage :: C.MaxLenBS MaxMessageLen -> m Message @@ -743,7 +723,9 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat liftIO (strDecode <$> B.readFile f) >>= \case Right d -> do s <- asks serverStats - atomically $ setServerStatsData s d + atomically $ setServerStats s d renameFile f $ f <> ".bak" logInfo "server stats restored" - Left e -> logInfo $ "error restoring server stats: " <> T.pack e + Left e -> do + logInfo $ "error restoring server stats: " <> T.pack e + liftIO exitFailure diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index f99dc135e..44c1b97d7 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,5 +1,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Server.Stats where @@ -8,7 +10,9 @@ import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B import Data.Set (Set) import qualified Data.Set as S -import Data.Time.Clock (UTCTime) +import Data.Time.Calendar.Month.Compat (pattern MonthDay) +import Data.Time.Calendar.OrdinalDate (mondayStartWeek) +import Data.Time.Clock (UTCTime (..)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM @@ -20,9 +24,7 @@ data ServerStats = ServerStats qDeleted :: TVar Int, msgSent :: TVar Int, msgRecv :: TVar Int, - dayMsgQueues :: TVar (Set RecipientId), - weekMsgQueues :: TVar (Set RecipientId), - monthMsgQueues :: TVar (Set RecipientId) + activeQueues :: PeriodStats RecipientId } data ServerStatsData = ServerStatsData @@ -32,9 +34,7 @@ data ServerStatsData = ServerStatsData _qDeleted :: Int, _msgSent :: Int, _msgRecv :: Int, - _dayMsgQueues :: Set RecipientId, - _weekMsgQueues :: Set RecipientId, - _monthMsgQueues :: Set RecipientId + _activeQueues :: PeriodStatsData RecipientId } newServerStats :: UTCTime -> STM ServerStats @@ -45,10 +45,8 @@ newServerStats ts = do qDeleted <- newTVar 0 msgSent <- newTVar 0 msgRecv <- newTVar 0 - dayMsgQueues <- newTVar S.empty - weekMsgQueues <- newTVar S.empty - monthMsgQueues <- newTVar S.empty - pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues} + activeQueues <- newPeriodStats + pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -58,25 +56,21 @@ getServerStatsData s = do _qDeleted <- readTVar $ qDeleted s _msgSent <- readTVar $ msgSent s _msgRecv <- readTVar $ msgRecv s - _dayMsgQueues <- readTVar $ dayMsgQueues s - _weekMsgQueues <- readTVar $ weekMsgQueues s - _monthMsgQueues <- readTVar $ monthMsgQueues s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} + _activeQueues <- getPeriodStatsData $ activeQueues s + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} -setServerStatsData :: ServerStats -> ServerStatsData -> STM () -setServerStatsData s d = do +setServerStats :: ServerStats -> ServerStatsData -> STM () +setServerStats s d = do writeTVar (fromTime s) (_fromTime d) writeTVar (qCreated s) (_qCreated d) writeTVar (qSecured s) (_qSecured d) writeTVar (qDeleted s) (_qDeleted d) writeTVar (msgSent s) (_msgSent d) writeTVar (msgRecv s) (_msgRecv d) - writeTVar (dayMsgQueues s) (_dayMsgQueues d) - writeTVar (weekMsgQueues s) (_weekMsgQueues d) - writeTVar (monthMsgQueues s) (_monthMsgQueues d) + setPeriodStats (activeQueues s) (_activeQueues d) instance StrEncoding ServerStatsData where - strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} = + strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} = B.unlines [ "fromTime=" <> strEncode _fromTime, "qCreated=" <> strEncode _qCreated, @@ -84,9 +78,8 @@ instance StrEncoding ServerStatsData where "qDeleted=" <> strEncode _qDeleted, "msgSent=" <> strEncode _msgSent, "msgRecv=" <> strEncode _msgRecv, - "dayMsgQueues=" <> strEncode _dayMsgQueues, - "weekMsgQueues=" <> strEncode _weekMsgQueues, - "monthMsgQueues=" <> strEncode _monthMsgQueues + "activeQueues:", + strEncode _activeQueues ] strP = do _fromTime <- "fromTime=" *> strP <* A.endOfLine @@ -95,7 +88,81 @@ instance StrEncoding ServerStatsData where _qDeleted <- "qDeleted=" *> strP <* A.endOfLine _msgSent <- "msgSent=" *> strP <* A.endOfLine _msgRecv <- "msgRecv=" *> strP <* A.endOfLine - _dayMsgQueues <- "dayMsgQueues=" *> strP <* A.endOfLine - _weekMsgQueues <- "weekMsgQueues=" *> strP <* A.endOfLine - _monthMsgQueues <- "monthMsgQueues=" *> strP <* optional A.endOfLine - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} + r <- optional ("activeQueues:" <* A.endOfLine) + _activeQueues <- case r of + Just _ -> strP <* optional A.endOfLine + _ -> do + _day <- "dayMsgQueues=" *> strP <* A.endOfLine + _week <- "weekMsgQueues=" *> strP <* A.endOfLine + _month <- "monthMsgQueues=" *> strP <* optional A.endOfLine + pure PeriodStatsData {_day, _week, _month} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} + +data PeriodStats a = PeriodStats + { day :: TVar (Set a), + week :: TVar (Set a), + month :: TVar (Set a) + } + +newPeriodStats :: STM (PeriodStats a) +newPeriodStats = do + day <- newTVar S.empty + week <- newTVar S.empty + month <- newTVar S.empty + pure PeriodStats {day, week, month} + +data PeriodStatsData a = PeriodStatsData + { _day :: Set a, + _week :: Set a, + _month :: Set a + } + +getPeriodStatsData :: PeriodStats a -> STM (PeriodStatsData a) +getPeriodStatsData s = do + _day <- readTVar $ day s + _week <- readTVar $ week s + _month <- readTVar $ month s + pure PeriodStatsData {_day, _week, _month} + +setPeriodStats :: PeriodStats a -> PeriodStatsData a -> STM () +setPeriodStats s d = do + writeTVar (day s) (_day d) + writeTVar (week s) (_week d) + writeTVar (month s) (_month d) + +instance (Ord a, StrEncoding a) => StrEncoding (PeriodStatsData a) where + strEncode PeriodStatsData {_day, _week, _month} = + "day=" <> strEncode _day <> "\nweek=" <> strEncode _week <> "\nmonth=" <> strEncode _month + strP = do + _day <- "day=" *> strP <* A.endOfLine + _week <- "week=" *> strP <* A.endOfLine + _month <- "month=" *> strP + pure PeriodStatsData {_day, _week, _month} + +data PeriodStatCounts = PeriodStatCounts + { dayCount :: String, + weekCount :: String, + monthCount :: String + } + +periodStatCounts :: forall a. PeriodStats a -> UTCTime -> STM PeriodStatCounts +periodStatCounts ps ts = do + let d = utctDay ts + (_, wDay) = mondayStartWeek d + MonthDay _ mDay = d + dayCount <- periodCount 1 $ day ps + weekCount <- periodCount wDay $ week ps + monthCount <- periodCount mDay $ month ps + pure PeriodStatCounts {dayCount, weekCount, monthCount} + where + periodCount :: Int -> TVar (Set a) -> STM String + periodCount 1 pVar = show . S.size <$> swapTVar pVar S.empty + periodCount _ _ = pure "" + +updatePeriodStats :: Ord a => PeriodStats a -> a -> STM () +updatePeriodStats stats pId = do + updatePeriod day + updatePeriod week + updatePeriod month + where + updatePeriod pSel = modifyTVar (pSel stats) (S.insert pId) diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index 70fe9b024..ca16c5d53 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -94,7 +94,12 @@ ntfServerCfg = -- CA certificate private key is not needed for initialization caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", - certificateFile = "tests/fixtures/server.crt" + certificateFile = "tests/fixtures/server.crt", + -- stats config + logStatsInterval = Nothing, + logStatsStartTime = 0, + serverStatsLogFile = "tests/ntf-server-stats.daily.log", + serverStatsBackupFile = Nothing } withNtfServerStoreLog :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ThreadId -> m a) -> m a