write distributions to files

This commit is contained in:
Alexander Bondarenko
2024-05-16 21:00:05 +03:00
parent 4c7ab32386
commit ba7bcef399
6 changed files with 93 additions and 37 deletions
+26 -10
View File
@@ -61,7 +61,7 @@ import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Set (Set)
import qualified Data.Set as S
import Data.Maybe (isNothing)
import Data.Maybe (isNothing, listToMaybe)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
@@ -96,12 +96,13 @@ import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Exit (exitFailure)
import System.FilePath (takeDirectory)
import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (timeout)
import UnliftIO.Async (mapConcurrently)
import UnliftIO.Concurrent
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Directory (createDirectoryIfMissing, doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM
@@ -230,9 +231,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
serverStatsThread_ _ = []
rateStatsThread_ :: ServerConfig -> [M ()]
rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} =
[ monitorServerRates (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection
logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while
rateStatsThread_ ServerConfig {rateStatsLength = nBuckets, rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} =
[ monitorServerRates nBuckets (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection
logServerRates logStatsStartTime logInterval rateStatsLogFile -- log current distributions once in a while
]
rateStatsThread_ _ = []
@@ -288,17 +289,19 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
]
liftIO $ threadDelay' interval
monitorServerRates :: Int64 -> M ()
monitorServerRates bucketWidth = do
monitorServerRates :: Int -> Int64 -> M ()
monitorServerRates nBuckets bucketWidth = do
labelMyThread "monitorServerRates"
stats' <- asks clientStats
rates' <- asks serverRates
liftIO . forever $ do
-- now <- getCurrentTime
-- TODO: calculate delay for the next bucket closing time
threadDelay' bucketWidth
-- TODO: collect and reset buckets
stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO)
logNote . tshow $ fmap (distribution . histogram) $ collect stats
let !rates = distribution . histogram <$> collect stats
atomically . modifyTVar' rates' $ (rates :) . take nBuckets
where
collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int)
collect stats = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) stats
@@ -324,13 +327,26 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
logServerRates :: Int64 -> Int64 -> FilePath -> M ()
logServerRates startAt logInterval statsFilePath = do
labelMyThread "logServerStats"
liftIO . unlessM (doesFileExist statsFilePath) $ do
createDirectoryIfMissing True (takeDirectory statsFilePath)
B.writeFile statsFilePath $ B.intercalate "," csvLabels <> "\n"
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server rates log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
let interval = 1000000 * logInterval
forever $ do
rates' <- asks serverRates
liftIO . forever $ do
-- write the thing
liftIO $ threadDelay' interval
threadDelay' interval
rates <- readTVarIO rates'
forM_ (listToMaybe rates) $ \cs -> do
ts <- getCurrentTime
let values = concatMap (concatMap $ pure . maybe "0" bshow) cs
withFile statsFilePath AppendMode $ \h -> liftIO $ do
hSetBuffering h LineBuffering
B.hPut h $ B.intercalate "," (strEncode ts : values) <> "\n"
where
csvLabels = "ts" : concatMap (\s -> concatMap (\d -> [s <> "." <> d]) distributionLabels) CS.clientStatsLabels
runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
runClient signKey tp h = do
+6 -2
View File
@@ -32,7 +32,7 @@ import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..))
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId)
import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsC, ClientStatsId)
import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
@@ -76,6 +76,8 @@ data ServerConfig = ServerConfig
serverStatsBackupFile :: Maybe FilePath,
-- | rate limit monitoring interval / bucket width, seconds
rateStatsInterval :: Maybe Int64,
-- | number of rate limit samples to keep
rateStatsLength :: Int,
rateStatsLogFile :: FilePath,
rateStatsBackupFile :: Maybe FilePath,
-- | CA certificate private key is not needed for initialization
@@ -124,6 +126,7 @@ data Env = Env
clientStats :: TVar (IntMap ClientStats), -- transitive session stats
statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets
sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders
serverRates :: TVar [ClientStatsC (Distribution (Maybe Int))], -- current (head) + historical distributions extracted from clientStats for logging and assessing ClientStatsData deviations
sockets :: SocketState,
clientSeq :: TVar ClientId,
clients :: TVar (IntMap Client),
@@ -219,7 +222,8 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
clientStats <- newTVarIO mempty
statsClients <- newTVarIO mempty
sendSignedClients <- newTVarIO mempty
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients}
serverRates <- newTVarIO mempty
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, serverRates}
where
restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode)
restoreQueues QueueStore {queues, senders, notifiers} f = do
+1
View File
@@ -225,6 +225,7 @@ smpServerCLI cfgPath logPath =
serverStatsLogFile = combine logPath "smp-server-stats.daily.log",
serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log",
rateStatsInterval = Just 60, -- TODO: add to options
rateStatsLength = 0, -- Just (24 * 60), -- TODO: add to options
rateStatsLogFile = combine logPath "smp-server-rates.daily.log",
rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log",
smpServerVRange = supportedServerSMPRelayVRange,
+27 -12
View File
@@ -1,5 +1,5 @@
{-# LANGUAGE DeriveFoldable #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
@@ -19,6 +19,7 @@ import Data.List (find)
import Data.Maybe (listToMaybe)
import Data.Set (Set)
import qualified Data.Set as S
import Data.String (IsString)
import Data.Time.Calendar.Month (pattern MonthDay)
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
import Data.Time.Clock (UTCTime (..))
@@ -248,16 +249,17 @@ histogram = Histogram . IM.fromListWith (+) . map (,1) . toList
{-# INLINE histogram #-}
distribution :: Histogram -> Distribution (Maybe Int)
distribution h = Distribution
{ minimal = fst <$> listToMaybe cdf',
bottom50p = bot 0.5, -- std median
top50p = top 0.5,
top20p = top 0.2,
top10p = top 0.1,
top5p = top 0.05,
top1p = top 0.01,
maximal = fst <$> listToMaybe rcdf'
}
distribution h =
Distribution
{ minimal = fst <$> listToMaybe cdf',
bottom50p = bot 0.5, -- std median
top50p = top 0.5,
top20p = top 0.2,
top10p = top 0.1,
top5p = top 0.05,
top1p = top 0.01,
maximal = fst <$> listToMaybe rcdf'
}
where
bot p = fmap fst $ find (\(_, p') -> p' >= p) cdf'
top p = fmap fst $ find (\(_, p') -> p' <= 1 - p) rcdf'
@@ -281,4 +283,17 @@ data Distribution a = Distribution
top1p :: a,
maximal :: a
}
deriving (Show, Functor, Foldable)
deriving (Show, Functor, Foldable, Traversable)
distributionLabels :: IsString a => Distribution a
distributionLabels =
Distribution
{ minimal = "minimal",
bottom50p = "bottom50p",
top50p = "top50p",
top20p = "top20p",
top10p = "top10p",
top5p = "top5p",
top1p = "top1p",
maximal = "maximal"
}
+32 -13
View File
@@ -1,4 +1,5 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
@@ -12,6 +13,7 @@ module Simplex.Messaging.Server.Stats.Client where
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import Data.Set (Set)
import Data.String (IsString)
import Data.Time.Clock (UTCTime (..))
import Simplex.Messaging.Protocol (RecipientId)
import Simplex.Messaging.Transport (PeerId)
@@ -158,19 +160,36 @@ data ClientStatsC a = ClientStatsC
proxyRelaysConnectedC :: a,
msgSentViaProxyC :: a
}
deriving (Show, Functor)
deriving (Show, Functor, Foldable, Traversable)
clientStatsC :: a -> ClientStatsC a
clientStatsC x = ClientStatsC
{ peerAddressesC = x,
socketCountC = x,
qCreatedC = x,
qSentSignedC = x,
msgSentSignedC = x,
msgSentUnsignedC = x,
msgDeliveredSignedC = x,
proxyRelaysRequestedC = x,
proxyRelaysConnectedC = x,
msgSentViaProxyC = x
}
clientStatsC x =
ClientStatsC
{ peerAddressesC = x,
socketCountC = x,
qCreatedC = x,
qSentSignedC = x,
msgSentSignedC = x,
msgSentUnsignedC = x,
msgDeliveredSignedC = x,
proxyRelaysRequestedC = x,
proxyRelaysConnectedC = x,
msgSentViaProxyC = x
}
{-# INLINE clientStatsC #-}
clientStatsLabels :: IsString a => ClientStatsC a
clientStatsLabels =
ClientStatsC
{ peerAddressesC = "peerAddresses",
socketCountC = "socketCount",
qCreatedC = "qCreated",
qSentSignedC = "qSentSigned",
msgSentSignedC = "msgSentSigned",
msgSentUnsignedC = "msgSentUnsigned",
msgDeliveredSignedC = "msgDeliveredSigned",
proxyRelaysRequestedC = "proxyRelaysRequested",
proxyRelaysConnectedC = "proxyRelaysConnected",
msgSentViaProxyC = "msgSentViaProxy"
}
{-# INLINE clientStatsLabels #-}
+1
View File
@@ -113,6 +113,7 @@ cfg =
serverStatsLogFile = "tests/smp-server-stats.daily.log",
serverStatsBackupFile = Nothing,
rateStatsInterval = Nothing,
rateStatsLength = 0,
rateStatsLogFile = "",
rateStatsBackupFile = Nothing,
caCertificateFile = "tests/fixtures/ca.crt",