mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 21:45:15 +00:00
collect distributions over counters
This commit is contained in:
@@ -231,7 +231,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
|
||||
rateStatsThread_ :: ServerConfig -> [M ()]
|
||||
rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} =
|
||||
[ monitorServerRates bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection
|
||||
[ monitorServerRates (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection
|
||||
logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while
|
||||
]
|
||||
rateStatsThread_ _ = []
|
||||
@@ -291,16 +291,41 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
monitorServerRates :: Int64 -> M ()
|
||||
monitorServerRates bucketWidth = do
|
||||
labelMyThread "monitorServerRates"
|
||||
forever $ do
|
||||
stats' <- asks clientStats
|
||||
liftIO . forever $ do
|
||||
-- now <- getCurrentTime
|
||||
-- TODO: calculate delay for the next bucket closing time
|
||||
liftIO $ threadDelay' bucketWidth
|
||||
threadDelay' bucketWidth
|
||||
-- TODO: collect and reset buckets
|
||||
stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO)
|
||||
logNote . tshow $ fmap (distribution . histogram) $ collect stats
|
||||
where
|
||||
collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int)
|
||||
collect stats = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) stats
|
||||
where
|
||||
toColumns acc statsId csd =
|
||||
CS.ClientStatsC
|
||||
{ peerAddressesC = IS.size _peerAddresses +> CS.peerAddressesC acc,
|
||||
socketCountC = _socketCount +> CS.socketCountC acc,
|
||||
-- created/updated skpped
|
||||
qCreatedC = S.size _qCreated +> CS.qCreatedC acc,
|
||||
qSentSignedC = S.size _qSentSigned +> CS.qSentSignedC acc,
|
||||
msgSentSignedC = _msgSentSigned +> CS.msgSentSignedC acc,
|
||||
msgSentUnsignedC = _msgSentUnsigned +> CS.msgSentUnsignedC acc,
|
||||
msgDeliveredSignedC = _msgDeliveredSigned +> CS.msgDeliveredSignedC acc,
|
||||
proxyRelaysRequestedC = _proxyRelaysRequested +> CS.proxyRelaysRequestedC acc,
|
||||
proxyRelaysConnectedC = _proxyRelaysConnected +> CS.proxyRelaysConnectedC acc,
|
||||
msgSentViaProxyC = _msgSentViaProxy +> CS.msgSentViaProxyC acc
|
||||
}
|
||||
where
|
||||
(+>) = IM.insertWith (+) statsId
|
||||
CS.ClientStatsData {_peerAddresses, _socketCount, _qCreated, _qSentSigned, _msgSentSigned, _msgSentUnsigned, _msgDeliveredSigned, _proxyRelaysRequested, _proxyRelaysConnected, _msgSentViaProxy} = csd
|
||||
|
||||
logServerRates :: Int64 -> Int64 -> FilePath -> M ()
|
||||
logServerRates startAt logInterval statsFilePath = do
|
||||
labelMyThread "logServerStats"
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
liftIO $ putStrLn $ "server rates log enabled: " <> statsFilePath
|
||||
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
let interval = 1000000 * logInterval
|
||||
forever $ do
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
{-# LANGUAGE DeriveFoldable #-}
|
||||
{-# LANGUAGE DeriveFunctor #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
@@ -10,18 +12,16 @@ module Simplex.Messaging.Server.Stats where
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Foldable (toList)
|
||||
import Data.IntMap (IntMap)
|
||||
import qualified Data.IntMap.Strict as IM
|
||||
import Data.IntPSQ (IntPSQ)
|
||||
import qualified Data.IntPSQ as IP
|
||||
import Data.Monoid (getSum)
|
||||
import Data.List (find)
|
||||
import Data.Maybe (listToMaybe)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Time.Calendar.Month (pattern MonthDay)
|
||||
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime (..))
|
||||
import Data.Time.Clock.POSIX (POSIXTime)
|
||||
import Data.Word (Word32)
|
||||
import Data.Time.Clock (UTCTime (..))
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (RecipientId)
|
||||
import UnliftIO.STM
|
||||
@@ -238,3 +238,47 @@ updatePeriodStats stats pId = do
|
||||
updatePeriod month
|
||||
where
|
||||
updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId)
|
||||
|
||||
-- counter -> occurences
|
||||
newtype Histogram = Histogram (IntMap Int)
|
||||
deriving (Show)
|
||||
|
||||
histogram :: Foldable t => t Int -> Histogram
|
||||
histogram = Histogram . IM.fromListWith (+) . map (,1) . toList
|
||||
{-# INLINE histogram #-}
|
||||
|
||||
distribution :: Histogram -> Distribution (Maybe Int)
|
||||
distribution h = Distribution
|
||||
{ minimal = fst <$> listToMaybe cdf',
|
||||
bottom50p = bot 0.5, -- std median
|
||||
top50p = top 0.5,
|
||||
top20p = top 0.2,
|
||||
top10p = top 0.1,
|
||||
top5p = top 0.05,
|
||||
top1p = top 0.01,
|
||||
maximal = fst <$> listToMaybe rcdf'
|
||||
}
|
||||
where
|
||||
bot p = fmap fst $ find (\(_, p') -> p' >= p) cdf'
|
||||
top p = fmap fst $ find (\(_, p') -> p' <= 1 - p) rcdf'
|
||||
cdf' = cdf h
|
||||
rcdf' = reverse cdf' -- allow find to work from the smaller end
|
||||
|
||||
cdf :: Histogram -> [(Int, Float)]
|
||||
cdf (Histogram h) = map (\(v, cc) -> (v, fromIntegral cc / total)) . scanl1 cumulative $ IM.assocs h
|
||||
where
|
||||
total :: Float
|
||||
total = fromIntegral $ sum h
|
||||
cumulative (_, acc) (v, c) = (v, acc + c)
|
||||
|
||||
data Distribution a = Distribution
|
||||
{ minimal :: a,
|
||||
bottom50p :: a,
|
||||
top50p :: a,
|
||||
top20p :: a,
|
||||
top10p :: a,
|
||||
top5p :: a,
|
||||
top1p :: a,
|
||||
maximal :: a
|
||||
}
|
||||
deriving (Show, Functor, Foldable)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE DeriveFunctor #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
@@ -143,3 +144,33 @@ mergeClientStatsData a b =
|
||||
_proxyRelaysConnected = _proxyRelaysConnected a + _proxyRelaysConnected b,
|
||||
_msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b
|
||||
}
|
||||
|
||||
-- | A column-based collection of ClientStats-related data.
|
||||
data ClientStatsC a = ClientStatsC
|
||||
{ peerAddressesC :: a,
|
||||
socketCountC :: a,
|
||||
qCreatedC :: a,
|
||||
qSentSignedC :: a,
|
||||
msgSentSignedC :: a,
|
||||
msgSentUnsignedC :: a,
|
||||
msgDeliveredSignedC :: a,
|
||||
proxyRelaysRequestedC :: a,
|
||||
proxyRelaysConnectedC :: a,
|
||||
msgSentViaProxyC :: a
|
||||
}
|
||||
deriving (Show, Functor)
|
||||
|
||||
clientStatsC :: a -> ClientStatsC a
|
||||
clientStatsC x = ClientStatsC
|
||||
{ peerAddressesC = x,
|
||||
socketCountC = x,
|
||||
qCreatedC = x,
|
||||
qSentSignedC = x,
|
||||
msgSentSignedC = x,
|
||||
msgSentUnsignedC = x,
|
||||
msgDeliveredSignedC = x,
|
||||
proxyRelaysRequestedC = x,
|
||||
proxyRelaysConnectedC = x,
|
||||
msgSentViaProxyC = x
|
||||
}
|
||||
{-# INLINE clientStatsC #-}
|
||||
|
||||
@@ -7,23 +7,15 @@
|
||||
|
||||
module Simplex.Messaging.Server.Stats.Timeline where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.IntMap (IntMap)
|
||||
import qualified Data.IntMap.Strict as IM
|
||||
import Data.IntPSQ (IntPSQ)
|
||||
import qualified Data.IntPSQ as IP
|
||||
import Data.Monoid (getSum)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Time.Calendar.Month (pattern MonthDay)
|
||||
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime (..))
|
||||
import Data.List (find, sortOn)
|
||||
import Data.Maybe (listToMaybe)
|
||||
import Data.Time.Clock (NominalDiffTime)
|
||||
import Data.Time.Clock.POSIX (POSIXTime)
|
||||
import Data.Word (Word32)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (RecipientId)
|
||||
import UnliftIO.STM
|
||||
|
||||
-- A time series of counters with an active head
|
||||
@@ -66,24 +58,3 @@ type WindowData = IntMap Int -- PeerId -> counter
|
||||
|
||||
window :: BucketId -> BucketId -> SparseSeries -> WindowData
|
||||
window = error "TODO: pick elements inside the range and drop bucket ids"
|
||||
|
||||
-- counter -> occurences
|
||||
type Histogram = IntMap Int
|
||||
|
||||
histogram :: WindowData -> Histogram
|
||||
histogram = fmap getSum . IM.fromListWith (<>) . map (,1) . IM.elems
|
||||
|
||||
distribution :: Histogram -> Distribution Int
|
||||
distribution = error "TODO: unroll histogram, sample elements at percentiles"
|
||||
|
||||
data Distribution a = Distribution
|
||||
{ minimal :: a,
|
||||
bottom50p :: a,
|
||||
top50p :: a,
|
||||
top20p :: a,
|
||||
top10p :: a,
|
||||
top5p :: a,
|
||||
top1p :: a,
|
||||
maximal :: a
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
Reference in New Issue
Block a user