diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e0e98ef3d..cf0ffd4bc 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -61,7 +61,7 @@ import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Set (Set) import qualified Data.Set as S -import Data.Maybe (isNothing) +import Data.Maybe (isNothing, listToMaybe) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) @@ -96,12 +96,13 @@ import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util import Simplex.Messaging.Version import System.Exit (exitFailure) +import System.FilePath (takeDirectory) import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO (timeout) import UnliftIO.Async (mapConcurrently) import UnliftIO.Concurrent -import UnliftIO.Directory (doesFileExist, renameFile) +import UnliftIO.Directory (createDirectoryIfMissing, doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.IO import UnliftIO.STM @@ -230,9 +231,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do serverStatsThread_ _ = [] rateStatsThread_ :: ServerConfig -> [M ()] - rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = - [ monitorServerRates (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection - logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while + rateStatsThread_ ServerConfig {rateStatsLength = nBuckets, rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = + [ monitorServerRates nBuckets (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + logServerRates logStatsStartTime logInterval rateStatsLogFile -- log current distributions once in a while ] rateStatsThread_ _ = [] @@ -288,17 +289,19 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do ] liftIO $ threadDelay' interval - monitorServerRates :: Int64 -> M () - monitorServerRates bucketWidth = do + monitorServerRates :: Int -> Int64 -> M () + monitorServerRates nBuckets bucketWidth = do labelMyThread "monitorServerRates" stats' <- asks clientStats + rates' <- asks serverRates liftIO . forever $ do -- now <- getCurrentTime -- TODO: calculate delay for the next bucket closing time threadDelay' bucketWidth -- TODO: collect and reset buckets stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO) - logNote . tshow $ fmap (distribution . histogram) $ collect stats + let !rates = distribution . histogram <$> collect stats + atomically . modifyTVar' rates' $ (rates :) . take nBuckets where collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int) collect stats = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) stats @@ -324,13 +327,26 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do logServerRates :: Int64 -> Int64 -> FilePath -> M () logServerRates startAt logInterval statsFilePath = do labelMyThread "logServerStats" + liftIO . unlessM (doesFileExist statsFilePath) $ do + createDirectoryIfMissing True (takeDirectory statsFilePath) + B.writeFile statsFilePath $ B.intercalate "," csvLabels <> "\n" initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server rates log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) let interval = 1000000 * logInterval - forever $ do + rates' <- asks serverRates + liftIO . forever $ do -- write the thing - liftIO $ threadDelay' interval + threadDelay' interval + rates <- readTVarIO rates' + forM_ (listToMaybe rates) $ \cs -> do + ts <- getCurrentTime + let values = concatMap (concatMap $ pure . maybe "0" bshow) cs + withFile statsFilePath AppendMode $ \h -> liftIO $ do + hSetBuffering h LineBuffering + B.hPut h $ B.intercalate "," (strEncode ts : values) <> "\n" + where + csvLabels = "ts" : concatMap (\s -> concatMap (\d -> [s <> "." <> d]) distributionLabels) CS.clientStatsLabels runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 2f02d78c5..c33b3abd8 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -32,7 +32,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId) +import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsC, ClientStatsId) import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) @@ -76,6 +76,8 @@ data ServerConfig = ServerConfig serverStatsBackupFile :: Maybe FilePath, -- | rate limit monitoring interval / bucket width, seconds rateStatsInterval :: Maybe Int64, + -- | number of rate limit samples to keep + rateStatsLength :: Int, rateStatsLogFile :: FilePath, rateStatsBackupFile :: Maybe FilePath, -- | CA certificate private key is not needed for initialization @@ -124,6 +126,7 @@ data Env = Env clientStats :: TVar (IntMap ClientStats), -- transitive session stats statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders + serverRates :: TVar [ClientStatsC (Distribution (Maybe Int))], -- current (head) + historical distributions extracted from clientStats for logging and assessing ClientStatsData deviations sockets :: SocketState, clientSeq :: TVar ClientId, clients :: TVar (IntMap Client), @@ -219,7 +222,8 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, clientStats <- newTVarIO mempty statsClients <- newTVarIO mempty sendSignedClients <- newTVarIO mempty - return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients} + serverRates <- newTVarIO mempty + return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, serverRates} where restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} f = do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 5ebb82ebb..2e4aba88a 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -225,6 +225,7 @@ smpServerCLI cfgPath logPath = serverStatsLogFile = combine logPath "smp-server-stats.daily.log", serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log", rateStatsInterval = Just 60, -- TODO: add to options + rateStatsLength = 0, -- Just (24 * 60), -- TODO: add to options rateStatsLogFile = combine logPath "smp-server-rates.daily.log", rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log", smpServerVRange = supportedServerSMPRelayVRange, diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index ece54cfcf..d6db73188 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,5 +1,5 @@ -{-# LANGUAGE DeriveFoldable #-} {-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -19,6 +19,7 @@ import Data.List (find) import Data.Maybe (listToMaybe) import Data.Set (Set) import qualified Data.Set as S +import Data.String (IsString) import Data.Time.Calendar.Month (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) import Data.Time.Clock (UTCTime (..)) @@ -248,16 +249,17 @@ histogram = Histogram . IM.fromListWith (+) . map (,1) . toList {-# INLINE histogram #-} distribution :: Histogram -> Distribution (Maybe Int) -distribution h = Distribution - { minimal = fst <$> listToMaybe cdf', - bottom50p = bot 0.5, -- std median - top50p = top 0.5, - top20p = top 0.2, - top10p = top 0.1, - top5p = top 0.05, - top1p = top 0.01, - maximal = fst <$> listToMaybe rcdf' - } +distribution h = + Distribution + { minimal = fst <$> listToMaybe cdf', + bottom50p = bot 0.5, -- std median + top50p = top 0.5, + top20p = top 0.2, + top10p = top 0.1, + top5p = top 0.05, + top1p = top 0.01, + maximal = fst <$> listToMaybe rcdf' + } where bot p = fmap fst $ find (\(_, p') -> p' >= p) cdf' top p = fmap fst $ find (\(_, p') -> p' <= 1 - p) rcdf' @@ -281,4 +283,17 @@ data Distribution a = Distribution top1p :: a, maximal :: a } - deriving (Show, Functor, Foldable) + deriving (Show, Functor, Foldable, Traversable) + +distributionLabels :: IsString a => Distribution a +distributionLabels = + Distribution + { minimal = "minimal", + bottom50p = "bottom50p", + top50p = "top50p", + top20p = "top20p", + top10p = "top10p", + top5p = "top5p", + top1p = "top1p", + maximal = "maximal" + } diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 82c86f326..a45984011 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -12,6 +13,7 @@ module Simplex.Messaging.Server.Stats.Client where import Data.IntSet (IntSet) import qualified Data.IntSet as IS import Data.Set (Set) +import Data.String (IsString) import Data.Time.Clock (UTCTime (..)) import Simplex.Messaging.Protocol (RecipientId) import Simplex.Messaging.Transport (PeerId) @@ -158,19 +160,36 @@ data ClientStatsC a = ClientStatsC proxyRelaysConnectedC :: a, msgSentViaProxyC :: a } - deriving (Show, Functor) + deriving (Show, Functor, Foldable, Traversable) clientStatsC :: a -> ClientStatsC a -clientStatsC x = ClientStatsC - { peerAddressesC = x, - socketCountC = x, - qCreatedC = x, - qSentSignedC = x, - msgSentSignedC = x, - msgSentUnsignedC = x, - msgDeliveredSignedC = x, - proxyRelaysRequestedC = x, - proxyRelaysConnectedC = x, - msgSentViaProxyC = x - } +clientStatsC x = + ClientStatsC + { peerAddressesC = x, + socketCountC = x, + qCreatedC = x, + qSentSignedC = x, + msgSentSignedC = x, + msgSentUnsignedC = x, + msgDeliveredSignedC = x, + proxyRelaysRequestedC = x, + proxyRelaysConnectedC = x, + msgSentViaProxyC = x + } {-# INLINE clientStatsC #-} + +clientStatsLabels :: IsString a => ClientStatsC a +clientStatsLabels = + ClientStatsC + { peerAddressesC = "peerAddresses", + socketCountC = "socketCount", + qCreatedC = "qCreated", + qSentSignedC = "qSentSigned", + msgSentSignedC = "msgSentSigned", + msgSentUnsignedC = "msgSentUnsigned", + msgDeliveredSignedC = "msgDeliveredSigned", + proxyRelaysRequestedC = "proxyRelaysRequested", + proxyRelaysConnectedC = "proxyRelaysConnected", + msgSentViaProxyC = "msgSentViaProxy" + } +{-# INLINE clientStatsLabels #-} diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 0affc4d88..d79ce2e73 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -113,6 +113,7 @@ cfg = serverStatsLogFile = "tests/smp-server-stats.daily.log", serverStatsBackupFile = Nothing, rateStatsInterval = Nothing, + rateStatsLength = 0, rateStatsLogFile = "", rateStatsBackupFile = Nothing, caCertificateFile = "tests/fixtures/ca.crt",