From f884ecc3ab57c8447bc0e71304858792ba88872d Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Wed, 15 May 2024 23:39:57 +0300 Subject: [PATCH] collect distributions over counters --- src/Simplex/Messaging/Server.hs | 33 +++++++++-- src/Simplex/Messaging/Server/Stats.hs | 56 +++++++++++++++++-- src/Simplex/Messaging/Server/Stats/Client.hs | 31 ++++++++++ .../Messaging/Server/Stats/Timeline.hs | 35 +----------- 4 files changed, 113 insertions(+), 42 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a977025f6..e0e98ef3d 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -231,7 +231,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do rateStatsThread_ :: ServerConfig -> [M ()] rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = - [ monitorServerRates bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + [ monitorServerRates (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while ] rateStatsThread_ _ = [] @@ -291,16 +291,41 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do monitorServerRates :: Int64 -> M () monitorServerRates bucketWidth = do labelMyThread "monitorServerRates" - forever $ do + stats' <- asks clientStats + liftIO . forever $ do + -- now <- getCurrentTime -- TODO: calculate delay for the next bucket closing time - liftIO $ threadDelay' bucketWidth + threadDelay' bucketWidth -- TODO: collect and reset buckets + stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO) + logNote . tshow $ fmap (distribution . histogram) $ collect stats + where + collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int) + collect stats = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) stats + where + toColumns acc statsId csd = + CS.ClientStatsC + { peerAddressesC = IS.size _peerAddresses +> CS.peerAddressesC acc, + socketCountC = _socketCount +> CS.socketCountC acc, + -- created/updated skpped + qCreatedC = S.size _qCreated +> CS.qCreatedC acc, + qSentSignedC = S.size _qSentSigned +> CS.qSentSignedC acc, + msgSentSignedC = _msgSentSigned +> CS.msgSentSignedC acc, + msgSentUnsignedC = _msgSentUnsigned +> CS.msgSentUnsignedC acc, + msgDeliveredSignedC = _msgDeliveredSigned +> CS.msgDeliveredSignedC acc, + proxyRelaysRequestedC = _proxyRelaysRequested +> CS.proxyRelaysRequestedC acc, + proxyRelaysConnectedC = _proxyRelaysConnected +> CS.proxyRelaysConnectedC acc, + msgSentViaProxyC = _msgSentViaProxy +> CS.msgSentViaProxyC acc + } + where + (+>) = IM.insertWith (+) statsId + CS.ClientStatsData {_peerAddresses, _socketCount, _qCreated, _qSentSigned, _msgSentSigned, _msgSentUnsigned, _msgDeliveredSigned, _proxyRelaysRequested, _proxyRelaysConnected, _msgSentViaProxy} = csd logServerRates :: Int64 -> Int64 -> FilePath -> M () logServerRates startAt logInterval statsFilePath = do labelMyThread "logServerStats" initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime - liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath + liftIO $ putStrLn $ "server rates log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) let interval = 1000000 * logInterval forever $ do diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 53f977f8f..ece54cfcf 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE DeriveFoldable #-} +{-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -10,18 +12,16 @@ module Simplex.Messaging.Server.Stats where import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import Data.Foldable (toList) import Data.IntMap (IntMap) import qualified Data.IntMap.Strict as IM -import Data.IntPSQ (IntPSQ) -import qualified Data.IntPSQ as IP -import Data.Monoid (getSum) +import Data.List (find) +import Data.Maybe (listToMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Time.Calendar.Month (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (NominalDiffTime, UTCTime (..)) -import Data.Time.Clock.POSIX (POSIXTime) -import Data.Word (Word32) +import Data.Time.Clock (UTCTime (..)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM @@ -238,3 +238,47 @@ updatePeriodStats stats pId = do updatePeriod month where updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId) + +-- counter -> occurences +newtype Histogram = Histogram (IntMap Int) + deriving (Show) + +histogram :: Foldable t => t Int -> Histogram +histogram = Histogram . IM.fromListWith (+) . map (,1) . toList +{-# INLINE histogram #-} + +distribution :: Histogram -> Distribution (Maybe Int) +distribution h = Distribution + { minimal = fst <$> listToMaybe cdf', + bottom50p = bot 0.5, -- std median + top50p = top 0.5, + top20p = top 0.2, + top10p = top 0.1, + top5p = top 0.05, + top1p = top 0.01, + maximal = fst <$> listToMaybe rcdf' + } + where + bot p = fmap fst $ find (\(_, p') -> p' >= p) cdf' + top p = fmap fst $ find (\(_, p') -> p' <= 1 - p) rcdf' + cdf' = cdf h + rcdf' = reverse cdf' -- allow find to work from the smaller end + +cdf :: Histogram -> [(Int, Float)] +cdf (Histogram h) = map (\(v, cc) -> (v, fromIntegral cc / total)) . scanl1 cumulative $ IM.assocs h + where + total :: Float + total = fromIntegral $ sum h + cumulative (_, acc) (v, c) = (v, acc + c) + +data Distribution a = Distribution + { minimal :: a, + bottom50p :: a, + top50p :: a, + top20p :: a, + top10p :: a, + top5p :: a, + top1p :: a, + maximal :: a + } + deriving (Show, Functor, Foldable) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 095f9f49c..82c86f326 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -143,3 +144,33 @@ mergeClientStatsData a b = _proxyRelaysConnected = _proxyRelaysConnected a + _proxyRelaysConnected b, _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b } + +-- | A column-based collection of ClientStats-related data. +data ClientStatsC a = ClientStatsC + { peerAddressesC :: a, + socketCountC :: a, + qCreatedC :: a, + qSentSignedC :: a, + msgSentSignedC :: a, + msgSentUnsignedC :: a, + msgDeliveredSignedC :: a, + proxyRelaysRequestedC :: a, + proxyRelaysConnectedC :: a, + msgSentViaProxyC :: a + } + deriving (Show, Functor) + +clientStatsC :: a -> ClientStatsC a +clientStatsC x = ClientStatsC + { peerAddressesC = x, + socketCountC = x, + qCreatedC = x, + qSentSignedC = x, + msgSentSignedC = x, + msgSentUnsignedC = x, + msgDeliveredSignedC = x, + proxyRelaysRequestedC = x, + proxyRelaysConnectedC = x, + msgSentViaProxyC = x + } +{-# INLINE clientStatsC #-} diff --git a/src/Simplex/Messaging/Server/Stats/Timeline.hs b/src/Simplex/Messaging/Server/Stats/Timeline.hs index 77bf7ba97..504f82f08 100644 --- a/src/Simplex/Messaging/Server/Stats/Timeline.hs +++ b/src/Simplex/Messaging/Server/Stats/Timeline.hs @@ -7,23 +7,15 @@ module Simplex.Messaging.Server.Stats.Timeline where -import Control.Applicative (optional, (<|>)) -import qualified Data.Attoparsec.ByteString.Char8 as A -import qualified Data.ByteString.Char8 as B import Data.IntMap (IntMap) import qualified Data.IntMap.Strict as IM import Data.IntPSQ (IntPSQ) import qualified Data.IntPSQ as IP -import Data.Monoid (getSum) -import Data.Set (Set) -import qualified Data.Set as S -import Data.Time.Calendar.Month (pattern MonthDay) -import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (NominalDiffTime, UTCTime (..)) +import Data.List (find, sortOn) +import Data.Maybe (listToMaybe) +import Data.Time.Clock (NominalDiffTime) import Data.Time.Clock.POSIX (POSIXTime) import Data.Word (Word32) -import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM -- A time series of counters with an active head @@ -66,24 +58,3 @@ type WindowData = IntMap Int -- PeerId -> counter window :: BucketId -> BucketId -> SparseSeries -> WindowData window = error "TODO: pick elements inside the range and drop bucket ids" - --- counter -> occurences -type Histogram = IntMap Int - -histogram :: WindowData -> Histogram -histogram = fmap getSum . IM.fromListWith (<>) . map (,1) . IM.elems - -distribution :: Histogram -> Distribution Int -distribution = error "TODO: unroll histogram, sample elements at percentiles" - -data Distribution a = Distribution - { minimal :: a, - bottom50p :: a, - top50p :: a, - top20p :: a, - top10p :: a, - top5p :: a, - top1p :: a, - maximal :: a - } - deriving (Show)