Files
simplexmq/src/Simplex/Messaging/Server/Stats.hs
T
2026-03-03 21:16:46 +00:00

1115 lines
36 KiB
Haskell

{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Simplex.Messaging.Server.Stats
( ServerStats (..),
ServerStatsData (..),
PeriodStats (..),
PeriodStatsData (..),
PeriodStatCounts (..),
ProxyStats (..),
ProxyStatsData (..),
ServiceStats (..),
ServiceStatsData (..),
TimeBuckets (..),
newServerStats,
getServerStatsData,
setServerStats,
newPeriodStats,
newPeriodStatsData,
getPeriodStatsData,
setPeriodStats,
periodStatDataCounts,
periodStatCounts,
updatePeriodStats,
newProxyStats,
newProxyStatsData,
getProxyStatsData,
getResetProxyStatsData,
setProxyStats,
newServiceStatsData,
newServiceStats,
getServiceStatsData,
getResetServiceStatsData,
setServiceStats,
emptyTimeBuckets,
updateTimeBuckets,
) where
import Control.Applicative (optional, (<|>))
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Hashable (hash)
import Data.IORef
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text (Text)
import Data.Time.Calendar.Month (pattern MonthDay)
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
import Data.Time.Clock (UTCTime (..))
import GHC.IORef (atomicSwapIORef)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (EntityId (..))
import Simplex.Messaging.SystemTime
import Simplex.Messaging.Util (atomicModifyIORef'_, tshow, unlessM)
data ServerStats = ServerStats
{ fromTime :: IORef UTCTime,
qCreated :: IORef Int,
qSecured :: IORef Int,
qDeletedAll :: IORef Int,
qDeletedAllB :: IORef Int,
qDeletedNew :: IORef Int,
qDeletedSecured :: IORef Int,
qBlocked :: IORef Int,
qSub :: IORef Int, -- only includes subscriptions when there were pending messages
-- qSubNoMsg :: IORef Int, -- this stat creates too many STM transactions
qSubAllB :: IORef Int, -- count of all subscription batches (with and without pending messages)
qSubAuth :: IORef Int,
qSubDuplicate :: IORef Int,
qSubProhibited :: IORef Int,
qSubEnd :: IORef Int,
qSubEndB :: IORef Int,
ntfCreated :: IORef Int,
ntfNewCreated :: IORef Int, -- credentials created at the time of queue creation
ntfDeleted :: IORef Int,
ntfDeletedB :: IORef Int,
ntfSub :: IORef Int,
ntfSubB :: IORef Int,
ntfSubAuth :: IORef Int,
ntfSubDuplicate :: IORef Int,
msgSent :: IORef Int,
msgSentAuth :: IORef Int,
msgSentQuota :: IORef Int,
msgSentLarge :: IORef Int,
msgSentBlock :: IORef Int,
msgRecv :: IORef Int,
msgRecvAckTimes :: IORef TimeBuckets,
msgRecvGet :: IORef Int,
msgGet :: IORef Int,
msgGetNoMsg :: IORef Int,
msgGetAuth :: IORef Int,
msgGetDuplicate :: IORef Int,
msgGetProhibited :: IORef Int,
msgExpired :: IORef Int,
activeQueues :: PeriodStats,
-- subscribedQueues :: PeriodStats, -- this stat uses too much memory
msgSentNtf :: IORef Int, -- sent messages with NTF flag
msgRecvNtf :: IORef Int, -- received messages with NTF flag
activeQueuesNtf :: PeriodStats,
msgNtfs :: IORef Int, -- messages notications delivered to NTF server (<= msgSentNtf)
msgNtfsB :: IORef Int, -- messages notication batches delivered to NTF server
msgNtfNoSub :: IORef Int, -- no subscriber to notifications (e.g., NTF server not connected)
msgNtfLost :: IORef Int, -- notification is lost because NTF delivery queue is full
msgNtfExpired :: IORef Int, -- expired
pRelays :: ProxyStats,
pRelaysOwn :: ProxyStats,
pMsgFwds :: ProxyStats,
pMsgFwdsOwn :: ProxyStats,
pMsgFwdsRecv :: IORef Int,
rcvServices :: ServiceStats,
ntfServices :: ServiceStats,
rcvServicesSubMsg :: IORef Int,
rcvServicesSubDuplicate :: IORef Int,
qCount :: IORef Int,
msgCount :: IORef Int,
ntfCount :: IORef Int
}
data ServerStatsData = ServerStatsData
{ _fromTime :: UTCTime,
_qCreated :: Int,
_qSecured :: Int,
_qDeletedAll :: Int,
_qDeletedAllB :: Int,
_qDeletedNew :: Int,
_qDeletedSecured :: Int,
_qBlocked :: Int,
_qSub :: Int,
_qSubAllB :: Int,
_qSubAuth :: Int,
_qSubDuplicate :: Int,
_qSubProhibited :: Int,
_qSubEnd :: Int,
_qSubEndB :: Int,
_ntfCreated :: Int,
_ntfNewCreated :: Int,
_ntfDeleted :: Int,
_ntfDeletedB :: Int,
_ntfSub :: Int,
_ntfSubB :: Int,
_ntfSubAuth :: Int,
_ntfSubDuplicate :: Int,
_msgSent :: Int,
_msgSentAuth :: Int,
_msgSentQuota :: Int,
_msgSentLarge :: Int,
_msgSentBlock :: Int,
_msgRecv :: Int,
_msgRecvAckTimes :: TimeBuckets,
_msgRecvGet :: Int,
_msgGet :: Int,
_msgGetNoMsg :: Int,
_msgGetAuth :: Int,
_msgGetDuplicate :: Int,
_msgGetProhibited :: Int,
_msgExpired :: Int,
_activeQueues :: PeriodStatsData,
_msgSentNtf :: Int,
_msgRecvNtf :: Int,
_activeQueuesNtf :: PeriodStatsData,
_msgNtfs :: Int,
_msgNtfsB :: Int,
_msgNtfNoSub :: Int,
_msgNtfLost :: Int,
_msgNtfExpired :: Int,
_pRelays :: ProxyStatsData,
_pRelaysOwn :: ProxyStatsData,
_pMsgFwds :: ProxyStatsData,
_pMsgFwdsOwn :: ProxyStatsData,
_pMsgFwdsRecv :: Int,
_ntfServices :: ServiceStatsData,
_rcvServices :: ServiceStatsData,
_rcvServicesSubMsg :: Int,
_rcvServicesSubDuplicate :: Int,
_qCount :: Int,
_msgCount :: Int,
_ntfCount :: Int
}
deriving (Show)
newServerStats :: UTCTime -> IO ServerStats
newServerStats ts = do
fromTime <- newIORef ts
qCreated <- newIORef 0
qSecured <- newIORef 0
qDeletedAll <- newIORef 0
qDeletedAllB <- newIORef 0
qDeletedNew <- newIORef 0
qDeletedSecured <- newIORef 0
qBlocked <- newIORef 0
qSub <- newIORef 0
qSubAllB <- newIORef 0
qSubAuth <- newIORef 0
qSubDuplicate <- newIORef 0
qSubProhibited <- newIORef 0
qSubEnd <- newIORef 0
qSubEndB <- newIORef 0
ntfCreated <- newIORef 0
ntfNewCreated <- newIORef 0
ntfDeleted <- newIORef 0
ntfDeletedB <- newIORef 0
ntfSub <- newIORef 0
ntfSubB <- newIORef 0
ntfSubAuth <- newIORef 0
ntfSubDuplicate <- newIORef 0
msgSent <- newIORef 0
msgSentAuth <- newIORef 0
msgSentQuota <- newIORef 0
msgSentLarge <- newIORef 0
msgSentBlock <- newIORef 0
msgRecv <- newIORef 0
msgRecvAckTimes <- newIORef $ TimeBuckets 0 0 IM.empty
msgRecvGet <- newIORef 0
msgGet <- newIORef 0
msgGetNoMsg <- newIORef 0
msgGetAuth <- newIORef 0
msgGetDuplicate <- newIORef 0
msgGetProhibited <- newIORef 0
msgExpired <- newIORef 0
activeQueues <- newPeriodStats
msgSentNtf <- newIORef 0
msgRecvNtf <- newIORef 0
activeQueuesNtf <- newPeriodStats
msgNtfs <- newIORef 0
msgNtfsB <- newIORef 0
msgNtfNoSub <- newIORef 0
msgNtfLost <- newIORef 0
msgNtfExpired <- newIORef 0
pRelays <- newProxyStats
pRelaysOwn <- newProxyStats
pMsgFwds <- newProxyStats
pMsgFwdsOwn <- newProxyStats
pMsgFwdsRecv <- newIORef 0
rcvServices <- newServiceStats
ntfServices <- newServiceStats
rcvServicesSubMsg <- newIORef 0
rcvServicesSubDuplicate <- newIORef 0
qCount <- newIORef 0
msgCount <- newIORef 0
ntfCount <- newIORef 0
pure
ServerStats
{ fromTime,
qCreated,
qSecured,
qDeletedAll,
qDeletedAllB,
qDeletedNew,
qDeletedSecured,
qBlocked,
qSub,
qSubAllB,
qSubAuth,
qSubDuplicate,
qSubProhibited,
qSubEnd,
qSubEndB,
ntfCreated,
ntfNewCreated,
ntfDeleted,
ntfDeletedB,
ntfSub,
ntfSubB,
ntfSubAuth,
ntfSubDuplicate,
msgSent,
msgSentAuth,
msgSentQuota,
msgSentLarge,
msgSentBlock,
msgRecv,
msgRecvAckTimes,
msgRecvGet,
msgGet,
msgGetNoMsg,
msgGetAuth,
msgGetDuplicate,
msgGetProhibited,
msgExpired,
activeQueues,
msgSentNtf,
msgRecvNtf,
activeQueuesNtf,
msgNtfs,
msgNtfsB,
msgNtfNoSub,
msgNtfLost,
msgNtfExpired,
pRelays,
pRelaysOwn,
pMsgFwds,
pMsgFwdsOwn,
pMsgFwdsRecv,
rcvServices,
ntfServices,
rcvServicesSubMsg,
rcvServicesSubDuplicate,
qCount,
msgCount,
ntfCount
}
getServerStatsData :: ServerStats -> IO ServerStatsData
getServerStatsData s = do
_fromTime <- readIORef $ fromTime s
_qCreated <- readIORef $ qCreated s
_qSecured <- readIORef $ qSecured s
_qDeletedAll <- readIORef $ qDeletedAll s
_qDeletedAllB <- readIORef $ qDeletedAllB s
_qDeletedNew <- readIORef $ qDeletedNew s
_qDeletedSecured <- readIORef $ qDeletedSecured s
_qBlocked <- readIORef $ qBlocked s
_qSub <- readIORef $ qSub s
_qSubAllB <- readIORef $ qSubAllB s
_qSubAuth <- readIORef $ qSubAuth s
_qSubDuplicate <- readIORef $ qSubDuplicate s
_qSubProhibited <- readIORef $ qSubProhibited s
_qSubEnd <- readIORef $ qSubEnd s
_qSubEndB <- readIORef $ qSubEndB s
_ntfCreated <- readIORef $ ntfCreated s
_ntfNewCreated <- readIORef $ ntfNewCreated s
_ntfDeleted <- readIORef $ ntfDeleted s
_ntfDeletedB <- readIORef $ ntfDeletedB s
_ntfSub <- readIORef $ ntfSub s
_ntfSubB <- readIORef $ ntfSubB s
_ntfSubAuth <- readIORef $ ntfSubAuth s
_ntfSubDuplicate <- readIORef $ ntfSubDuplicate s
_msgSent <- readIORef $ msgSent s
_msgSentAuth <- readIORef $ msgSentAuth s
_msgSentQuota <- readIORef $ msgSentQuota s
_msgSentLarge <- readIORef $ msgSentLarge s
_msgSentBlock <- readIORef $ msgSentBlock s
_msgRecv <- readIORef $ msgRecv s
_msgRecvAckTimes <- readIORef $ msgRecvAckTimes s
_msgRecvGet <- readIORef $ msgRecvGet s
_msgGet <- readIORef $ msgGet s
_msgGetNoMsg <- readIORef $ msgGetNoMsg s
_msgGetAuth <- readIORef $ msgGetAuth s
_msgGetDuplicate <- readIORef $ msgGetDuplicate s
_msgGetProhibited <- readIORef $ msgGetProhibited s
_msgExpired <- readIORef $ msgExpired s
_activeQueues <- getPeriodStatsData $ activeQueues s
_msgSentNtf <- readIORef $ msgSentNtf s
_msgRecvNtf <- readIORef $ msgRecvNtf s
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
_msgNtfs <- readIORef $ msgNtfs s
_msgNtfsB <- readIORef $ msgNtfsB s
_msgNtfNoSub <- readIORef $ msgNtfNoSub s
_msgNtfLost <- readIORef $ msgNtfLost s
_msgNtfExpired <- readIORef $ msgNtfExpired s
_pRelays <- getProxyStatsData $ pRelays s
_pRelaysOwn <- getProxyStatsData $ pRelaysOwn s
_pMsgFwds <- getProxyStatsData $ pMsgFwds s
_pMsgFwdsOwn <- getProxyStatsData $ pMsgFwdsOwn s
_pMsgFwdsRecv <- readIORef $ pMsgFwdsRecv s
_rcvServices <- getServiceStatsData $ rcvServices s
_ntfServices <- getServiceStatsData $ ntfServices s
_rcvServicesSubMsg <- readIORef $ rcvServicesSubMsg s
_rcvServicesSubDuplicate <- readIORef $ rcvServicesSubDuplicate s
_qCount <- readIORef $ qCount s
_msgCount <- readIORef $ msgCount s
_ntfCount <- readIORef $ ntfCount s
pure
ServerStatsData
{ _fromTime,
_qCreated,
_qSecured,
_qDeletedAll,
_qDeletedAllB,
_qDeletedNew,
_qDeletedSecured,
_qBlocked,
_qSub,
_qSubAllB,
_qSubAuth,
_qSubDuplicate,
_qSubProhibited,
_qSubEnd,
_qSubEndB,
_ntfCreated,
_ntfNewCreated,
_ntfDeleted,
_ntfDeletedB,
_ntfSub,
_ntfSubB,
_ntfSubAuth,
_ntfSubDuplicate,
_msgSent,
_msgSentAuth,
_msgSentQuota,
_msgSentLarge,
_msgSentBlock,
_msgRecv,
_msgRecvAckTimes,
_msgRecvGet,
_msgGet,
_msgGetNoMsg,
_msgGetAuth,
_msgGetDuplicate,
_msgGetProhibited,
_msgExpired,
_activeQueues,
_msgSentNtf,
_msgRecvNtf,
_activeQueuesNtf,
_msgNtfs,
_msgNtfsB,
_msgNtfNoSub,
_msgNtfLost,
_msgNtfExpired,
_pRelays,
_pRelaysOwn,
_pMsgFwds,
_pMsgFwdsOwn,
_pMsgFwdsRecv,
_rcvServices,
_ntfServices,
_rcvServicesSubMsg,
_rcvServicesSubDuplicate,
_qCount,
_msgCount,
_ntfCount
}
-- this function is not thread safe, it is used on server start only
setServerStats :: ServerStats -> ServerStatsData -> IO ()
setServerStats s d = do
writeIORef (fromTime s) $! _fromTime d
writeIORef (qCreated s) $! _qCreated d
writeIORef (qSecured s) $! _qSecured d
writeIORef (qDeletedAll s) $! _qDeletedAll d
writeIORef (qDeletedAllB s) $! _qDeletedAllB d
writeIORef (qDeletedNew s) $! _qDeletedNew d
writeIORef (qDeletedSecured s) $! _qDeletedSecured d
writeIORef (qBlocked s) $! _qBlocked d
writeIORef (qSub s) $! _qSub d
writeIORef (qSubAllB s) $! _qSubAllB d
writeIORef (qSubAuth s) $! _qSubAuth d
writeIORef (qSubDuplicate s) $! _qSubDuplicate d
writeIORef (qSubProhibited s) $! _qSubProhibited d
writeIORef (qSubEnd s) $! _qSubEnd d
writeIORef (qSubEndB s) $! _qSubEndB d
writeIORef (ntfCreated s) $! _ntfCreated d
writeIORef (ntfNewCreated s) $! _ntfNewCreated d
writeIORef (ntfDeleted s) $! _ntfDeleted d
writeIORef (ntfDeletedB s) $! _ntfDeletedB d
writeIORef (ntfSub s) $! _ntfSub d
writeIORef (ntfSubB s) $! _ntfSubB d
writeIORef (ntfSubAuth s) $! _ntfSubAuth d
writeIORef (ntfSubDuplicate s) $! _ntfSubDuplicate d
writeIORef (msgSent s) $! _msgSent d
writeIORef (msgSentAuth s) $! _msgSentAuth d
writeIORef (msgSentQuota s) $! _msgSentQuota d
writeIORef (msgSentLarge s) $! _msgSentLarge d
writeIORef (msgSentBlock s) $! _msgSentBlock d
writeIORef (msgRecv s) $! _msgRecv d
writeIORef (msgRecvAckTimes s) $! _msgRecvAckTimes d
writeIORef (msgRecvGet s) $! _msgRecvGet d
writeIORef (msgGet s) $! _msgGet d
writeIORef (msgGetNoMsg s) $! _msgGetNoMsg d
writeIORef (msgGetAuth s) $! _msgGetAuth d
writeIORef (msgGetDuplicate s) $! _msgGetDuplicate d
writeIORef (msgGetProhibited s) $! _msgGetProhibited d
writeIORef (msgExpired s) $! _msgExpired d
setPeriodStats (activeQueues s) (_activeQueues d)
writeIORef (msgSentNtf s) $! _msgSentNtf d
writeIORef (msgRecvNtf s) $! _msgRecvNtf d
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
writeIORef (msgNtfs s) $! _msgNtfs d
writeIORef (msgNtfsB s) $! _msgNtfsB d
writeIORef (msgNtfNoSub s) $! _msgNtfNoSub d
writeIORef (msgNtfLost s) $! _msgNtfLost d
writeIORef (msgNtfExpired s) $! _msgNtfExpired d
setProxyStats (pRelays s) $! _pRelays d
setProxyStats (pRelaysOwn s) $! _pRelaysOwn d
setProxyStats (pMsgFwds s) $! _pMsgFwds d
setProxyStats (pMsgFwdsOwn s) $! _pMsgFwdsOwn d
writeIORef (pMsgFwdsRecv s) $! _pMsgFwdsRecv d
setServiceStats (rcvServices s) $! _rcvServices d
setServiceStats (ntfServices s) $! _ntfServices d
writeIORef (rcvServicesSubMsg s) $! _rcvServicesSubMsg d
writeIORef (rcvServicesSubDuplicate s) $! _rcvServicesSubDuplicate d
writeIORef (qCount s) $! _qCount d
writeIORef (msgCount s) $! _msgCount d
writeIORef (ntfCount s) $! _ntfCount d
instance StrEncoding ServerStatsData where
strEncode d =
B.unlines
[ "fromTime=" <> strEncode (_fromTime d),
"qCreated=" <> strEncode (_qCreated d),
"qSecured=" <> strEncode (_qSecured d),
"qDeletedAll=" <> strEncode (_qDeletedAll d),
"qDeletedNew=" <> strEncode (_qDeletedNew d),
"qDeletedSecured=" <> strEncode (_qDeletedSecured d),
"qDeletedAllB=" <> strEncode (_qDeletedAllB d),
"qBlocked=" <> strEncode (_qBlocked d),
"qCount=" <> strEncode (_qCount d),
"qSub=" <> strEncode (_qSub d),
"qSubAllB=" <> strEncode (_qSubAllB d),
"qSubAuth=" <> strEncode (_qSubAuth d),
"qSubDuplicate=" <> strEncode (_qSubDuplicate d),
"qSubProhibited=" <> strEncode (_qSubProhibited d),
"qSubEnd=" <> strEncode (_qSubEnd d),
"qSubEndB=" <> strEncode (_qSubEndB d),
"ntfCreated=" <> strEncode (_ntfCreated d),
"ntfNewCreated=" <> strEncode (_ntfNewCreated d),
"ntfDeleted=" <> strEncode (_ntfDeleted d),
"ntfDeletedB=" <> strEncode (_ntfDeletedB d),
"ntfSub=" <> strEncode (_ntfSub d),
"ntfSubB=" <> strEncode (_ntfSubB d),
"ntfSubAuth=" <> strEncode (_ntfSubAuth d),
"ntfSubDuplicate=" <> strEncode (_ntfSubDuplicate d),
"msgSent=" <> strEncode (_msgSent d),
"msgSentAuth=" <> strEncode (_msgSentAuth d),
"msgSentQuota=" <> strEncode (_msgSentQuota d),
"msgSentLarge=" <> strEncode (_msgSentLarge d),
"msgSentBlock=" <> strEncode (_msgSentBlock d),
"msgRecv=" <> strEncode (_msgRecv d),
"msgRecvGet=" <> strEncode (_msgRecvGet d),
"msgGet=" <> strEncode (_msgGet d),
"msgGetNoMsg=" <> strEncode (_msgGetNoMsg d),
"msgGetAuth=" <> strEncode (_msgGetAuth d),
"msgGetDuplicate=" <> strEncode (_msgGetDuplicate d),
"msgGetProhibited=" <> strEncode (_msgGetProhibited d),
"msgExpired=" <> strEncode (_msgExpired d),
"msgSentNtf=" <> strEncode (_msgSentNtf d),
"msgRecvNtf=" <> strEncode (_msgRecvNtf d),
"msgNtfs=" <> strEncode (_msgNtfs d),
"msgNtfsB=" <> strEncode (_msgNtfsB d),
"msgNtfNoSub=" <> strEncode (_msgNtfNoSub d),
"msgNtfLost=" <> strEncode (_msgNtfLost d),
"msgNtfExpired=" <> strEncode (_msgNtfExpired d),
"activeQueues:",
strEncode (_activeQueues d),
"activeQueuesNtf:",
strEncode (_activeQueuesNtf d),
"pRelays:",
strEncode (_pRelays d),
"pRelaysOwn:",
strEncode (_pRelaysOwn d),
"pMsgFwds:",
strEncode (_pMsgFwds d),
"pMsgFwdsOwn:",
strEncode (_pMsgFwdsOwn d),
"pMsgFwdsRecv=" <> strEncode (_pMsgFwdsRecv d),
"rcvServices:",
strEncode (_rcvServices d),
"ntfServices:",
strEncode (_ntfServices d)
]
strP = do
_fromTime <- "fromTime=" *> strP <* A.endOfLine
_qCreated <- "qCreated=" *> strP <* A.endOfLine
_qSecured <- "qSecured=" *> strP <* A.endOfLine
(_qDeletedAll, _qDeletedNew, _qDeletedSecured) <-
(,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine)
<|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine))
_qDeletedAllB <- opt "qDeletedAllB="
_qBlocked <- opt "qBlocked="
_qCount <- opt "qCount="
_qSub <- opt "qSub="
_qSubNoMsg <- skipInt "qSubNoMsg=" -- skipping it for backward compatibility
_qSubAllB <- opt "qSubAllB="
_qSubAuth <- opt "qSubAuth="
_qSubDuplicate <- opt "qSubDuplicate="
_qSubProhibited <- opt "qSubProhibited="
_qSubEnd <- opt "qSubEnd="
_qSubEndB <- opt "qSubEndB="
_ntfCreated <- opt "ntfCreated="
_ntfNewCreated <- opt "ntfNewCreated="
_ntfDeleted <- opt "ntfDeleted="
_ntfDeletedB <- opt "ntfDeletedB="
_ntfSub <- opt "ntfSub="
_ntfSubB <- opt "ntfSubB="
_ntfSubAuth <- opt "ntfSubAuth="
_ntfSubDuplicate <- opt "ntfSubDuplicate="
_msgSent <- "msgSent=" *> strP <* A.endOfLine
_msgSentAuth <- opt "msgSentAuth="
_msgSentQuota <- opt "msgSentQuota="
_msgSentLarge <- opt "msgSentLarge="
_msgSentBlock <- opt "msgSentBlock="
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
_msgRecvGet <- opt "msgRecvGet="
_msgGet <- opt "msgGet="
_msgGetNoMsg <- opt "msgGetNoMsg="
_msgGetAuth <- opt "msgGetAuth="
_msgGetDuplicate <- opt "msgGetDuplicate="
_msgGetProhibited <- opt "msgGetProhibited="
_msgExpired <- opt "msgExpired="
_msgSentNtf <- opt "msgSentNtf="
_msgRecvNtf <- opt "msgRecvNtf="
_msgNtfs <- opt "msgNtfs="
_msgNtfsB <- opt "msgNtfsB="
_msgNtfNoSub <- opt "msgNtfNoSub="
_msgNtfLost <- opt "msgNtfLost="
_msgNtfExpired <- opt "msgNtfExpired="
_activeQueues <-
optional ("activeQueues:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> do
_day <- "dayMsgQueues=" *> strP <* A.endOfLine
_week <- "weekMsgQueues=" *> strP <* A.endOfLine
_month <- "monthMsgQueues=" *> strP <* optional A.endOfLine
pure PeriodStatsData {_day, _week, _month}
_subscribedQueues <-
optional ("subscribedQueues:" <* A.endOfLine) >>= \case
Just _ -> newPeriodStatsData <$ (strP @PeriodStatsData <* optional A.endOfLine)
_ -> pure newPeriodStatsData
_activeQueuesNtf <-
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newPeriodStatsData
_pRelays <- proxyStatsP "pRelays:"
_pRelaysOwn <- proxyStatsP "pRelaysOwn:"
_pMsgFwds <- proxyStatsP "pMsgFwds:"
_pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:"
_pMsgFwdsRecv <- opt "pMsgFwdsRecv="
_rcvServices <- serviceStatsP "rcvServices:"
_ntfServices <- serviceStatsP "ntfServices:"
pure
ServerStatsData
{ _fromTime,
_qCreated,
_qSecured,
_qDeletedAll,
_qDeletedAllB,
_qDeletedNew,
_qDeletedSecured,
_qBlocked,
_qSub,
_qSubAllB,
_qSubAuth,
_qSubDuplicate,
_qSubProhibited,
_qSubEnd,
_qSubEndB,
_ntfCreated,
_ntfNewCreated,
_ntfDeleted,
_ntfDeletedB,
_ntfSub,
_ntfSubB,
_ntfSubAuth,
_ntfSubDuplicate,
_msgSent,
_msgSentAuth,
_msgSentQuota,
_msgSentLarge,
_msgSentBlock,
_msgRecv,
_msgRecvAckTimes = emptyTimeBuckets,
_msgRecvGet,
_msgGet,
_msgGetNoMsg,
_msgGetAuth,
_msgGetDuplicate,
_msgGetProhibited,
_msgExpired,
_msgSentNtf,
_msgRecvNtf,
_msgNtfs,
_msgNtfsB,
_msgNtfNoSub,
_msgNtfLost,
_msgNtfExpired,
_activeQueues,
_activeQueuesNtf,
_pRelays,
_pRelaysOwn,
_pMsgFwds,
_pMsgFwdsOwn,
_pMsgFwdsRecv,
_rcvServices,
_ntfServices,
_rcvServicesSubMsg = 0,
_rcvServicesSubDuplicate = 0,
_qCount,
_msgCount = 0,
_ntfCount = 0
}
where
opt s = A.string s *> strP <* A.endOfLine <|> pure 0
skipInt s = (0 :: Int) <$ optional (A.string s *> strP @Int *> A.endOfLine)
proxyStatsP key =
optional (A.string key >> A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newProxyStatsData
serviceStatsP key =
optional (A.string key >> A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newServiceStatsData
data PeriodStats = PeriodStats
{ day :: IORef IntSet,
week :: IORef IntSet,
month :: IORef IntSet
}
newPeriodStats :: IO PeriodStats
newPeriodStats = do
day <- newIORef IS.empty
week <- newIORef IS.empty
month <- newIORef IS.empty
pure PeriodStats {day, week, month}
data PeriodStatsData = PeriodStatsData
{ _day :: IntSet,
_week :: IntSet,
_month :: IntSet
}
deriving (Show)
newPeriodStatsData :: PeriodStatsData
newPeriodStatsData = PeriodStatsData {_day = IS.empty, _week = IS.empty, _month = IS.empty}
getPeriodStatsData :: PeriodStats -> IO PeriodStatsData
getPeriodStatsData s = do
_day <- readIORef $ day s
_week <- readIORef $ week s
_month <- readIORef $ month s
pure PeriodStatsData {_day, _week, _month}
-- this function is not thread safe, it is used on server start only
setPeriodStats :: PeriodStats -> PeriodStatsData -> IO ()
setPeriodStats s d = do
writeIORef (day s) $! _day d
writeIORef (week s) $! _week d
writeIORef (month s) $! _month d
instance StrEncoding PeriodStatsData where
strEncode PeriodStatsData {_day, _week, _month} =
"dayHashes=" <> strEncode _day <> "\nweekHashes=" <> strEncode _week <> "\nmonthHashes=" <> strEncode _month
strP = do
_day <- ("day=" *> bsSetP <|> "dayHashes=" *> strP) <* A.endOfLine
_week <- ("week=" *> bsSetP <|> "weekHashes=" *> strP) <* A.endOfLine
_month <- "month=" *> bsSetP <|> "monthHashes=" *> strP
pure PeriodStatsData {_day, _week, _month}
where
bsSetP = S.foldl' (\s -> (`IS.insert` s) . hash) IS.empty <$> strP @(Set ByteString)
data PeriodStatCounts = PeriodStatCounts
{ dayCount :: Text,
weekCount :: Text,
monthCount :: Text
}
periodStatDataCounts :: PeriodStatsData -> PeriodStatCounts
periodStatDataCounts PeriodStatsData {_day, _week, _month} =
PeriodStatCounts
{ dayCount = tshow $ IS.size _day,
weekCount = tshow $ IS.size _week,
monthCount = tshow $ IS.size _month
}
periodStatCounts :: PeriodStats -> UTCTime -> IO PeriodStatCounts
periodStatCounts ps ts = do
let d = utctDay ts
(_, wDay) = mondayStartWeek d
MonthDay _ mDay = d
dayCount <- periodCount 1 $ day ps
weekCount <- periodCount wDay $ week ps
monthCount <- periodCount mDay $ month ps
pure PeriodStatCounts {dayCount, weekCount, monthCount}
where
periodCount :: Int -> IORef IntSet -> IO Text
periodCount 1 ref = tshow . IS.size <$> atomicSwapIORef ref IS.empty
periodCount _ _ = pure ""
updatePeriodStats :: PeriodStats -> EntityId -> IO ()
updatePeriodStats ps (EntityId pId) = do
updatePeriod $ day ps
updatePeriod $ week ps
updatePeriod $ month ps
where
ph = hash pId
updatePeriod ref = unlessM (IS.member ph <$> readIORef ref) $ atomicModifyIORef'_ ref $ IS.insert ph
data ProxyStats = ProxyStats
{ pRequests :: IORef Int,
pSuccesses :: IORef Int, -- includes destination server error responses that will be forwarded to the client
pErrorsConnect :: IORef Int,
pErrorsCompat :: IORef Int,
pErrorsOther :: IORef Int
}
newProxyStats :: IO ProxyStats
newProxyStats = do
pRequests <- newIORef 0
pSuccesses <- newIORef 0
pErrorsConnect <- newIORef 0
pErrorsCompat <- newIORef 0
pErrorsOther <- newIORef 0
pure ProxyStats {pRequests, pSuccesses, pErrorsConnect, pErrorsCompat, pErrorsOther}
data ProxyStatsData = ProxyStatsData
{ _pRequests :: Int,
_pSuccesses :: Int,
_pErrorsConnect :: Int,
_pErrorsCompat :: Int,
_pErrorsOther :: Int
}
deriving (Show)
newProxyStatsData :: ProxyStatsData
newProxyStatsData = ProxyStatsData {_pRequests = 0, _pSuccesses = 0, _pErrorsConnect = 0, _pErrorsCompat = 0, _pErrorsOther = 0}
getProxyStatsData :: ProxyStats -> IO ProxyStatsData
getProxyStatsData s = do
_pRequests <- readIORef $ pRequests s
_pSuccesses <- readIORef $ pSuccesses s
_pErrorsConnect <- readIORef $ pErrorsConnect s
_pErrorsCompat <- readIORef $ pErrorsCompat s
_pErrorsOther <- readIORef $ pErrorsOther s
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
getResetProxyStatsData :: ProxyStats -> IO ProxyStatsData
getResetProxyStatsData s = do
_pRequests <- atomicSwapIORef (pRequests s) 0
_pSuccesses <- atomicSwapIORef (pSuccesses s) 0
_pErrorsConnect <- atomicSwapIORef (pErrorsConnect s) 0
_pErrorsCompat <- atomicSwapIORef (pErrorsCompat s) 0
_pErrorsOther <- atomicSwapIORef (pErrorsOther s) 0
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
-- this function is not thread safe, it is used on server start only
setProxyStats :: ProxyStats -> ProxyStatsData -> IO ()
setProxyStats s d = do
writeIORef (pRequests s) $! _pRequests d
writeIORef (pSuccesses s) $! _pSuccesses d
writeIORef (pErrorsConnect s) $! _pErrorsConnect d
writeIORef (pErrorsCompat s) $! _pErrorsCompat d
writeIORef (pErrorsOther s) $! _pErrorsOther d
instance StrEncoding ProxyStatsData where
strEncode ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
"requests="
<> strEncode _pRequests
<> "\nsuccesses="
<> strEncode _pSuccesses
<> "\nerrorsConnect="
<> strEncode _pErrorsConnect
<> "\nerrorsCompat="
<> strEncode _pErrorsCompat
<> "\nerrorsOther="
<> strEncode _pErrorsOther
strP = do
_pRequests <- "requests=" *> strP <* A.endOfLine
_pSuccesses <- "successes=" *> strP <* A.endOfLine
_pErrorsConnect <- "errorsConnect=" *> strP <* A.endOfLine
_pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine
_pErrorsOther <- "errorsOther=" *> strP
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
data ServiceStats = ServiceStats
{ srvAssocNew :: IORef Int,
srvAssocDuplicate :: IORef Int,
srvAssocUpdated :: IORef Int,
srvAssocRemoved :: IORef Int,
srvSubCount :: IORef Int,
srvSubDuplicate :: IORef Int,
srvSubQueues :: IORef Int,
srvSubEnd :: IORef Int,
-- counts of subscriptions
srvSubOk :: IORef Int, -- server has the same queues as expected
srvSubMore :: IORef Int, -- server has more queues than expected
srvSubFewer :: IORef Int, -- server has fewer queues than expected
srvSubDiff :: IORef Int, -- server has the same count, but different queues than expected (based on xor-hash)
-- adds actual deviations
srvSubMoreTotal :: IORef Int, -- server has more queues than expected, adds diff
srvSubFewerTotal :: IORef Int
}
data ServiceStatsData = ServiceStatsData
{ _srvAssocNew :: Int,
_srvAssocDuplicate :: Int,
_srvAssocUpdated :: Int,
_srvAssocRemoved :: Int,
_srvSubCount :: Int,
_srvSubDuplicate :: Int,
_srvSubQueues :: Int,
_srvSubEnd :: Int,
_srvSubOk :: Int,
_srvSubMore :: Int,
_srvSubFewer :: Int,
_srvSubDiff :: Int,
_srvSubMoreTotal :: Int,
_srvSubFewerTotal :: Int
}
deriving (Show)
newServiceStatsData :: ServiceStatsData
newServiceStatsData =
ServiceStatsData
{ _srvAssocNew = 0,
_srvAssocDuplicate = 0,
_srvAssocUpdated = 0,
_srvAssocRemoved = 0,
_srvSubCount = 0,
_srvSubDuplicate = 0,
_srvSubQueues = 0,
_srvSubEnd = 0,
_srvSubOk = 0,
_srvSubMore = 0,
_srvSubFewer = 0,
_srvSubDiff = 0,
_srvSubMoreTotal = 0,
_srvSubFewerTotal = 0
}
newServiceStats :: IO ServiceStats
newServiceStats = do
srvAssocNew <- newIORef 0
srvAssocDuplicate <- newIORef 0
srvAssocUpdated <- newIORef 0
srvAssocRemoved <- newIORef 0
srvSubCount <- newIORef 0
srvSubDuplicate <- newIORef 0
srvSubQueues <- newIORef 0
srvSubEnd <- newIORef 0
srvSubOk <- newIORef 0
srvSubMore <- newIORef 0
srvSubFewer <- newIORef 0
srvSubDiff <- newIORef 0
srvSubMoreTotal <- newIORef 0
srvSubFewerTotal <- newIORef 0
pure
ServiceStats
{ srvAssocNew,
srvAssocDuplicate,
srvAssocUpdated,
srvAssocRemoved,
srvSubCount,
srvSubDuplicate,
srvSubQueues,
srvSubEnd,
srvSubOk,
srvSubMore,
srvSubFewer,
srvSubDiff,
srvSubMoreTotal,
srvSubFewerTotal
}
getServiceStatsData :: ServiceStats -> IO ServiceStatsData
getServiceStatsData s = do
_srvAssocNew <- readIORef $ srvAssocNew s
_srvAssocDuplicate <- readIORef $ srvAssocDuplicate s
_srvAssocUpdated <- readIORef $ srvAssocUpdated s
_srvAssocRemoved <- readIORef $ srvAssocRemoved s
_srvSubCount <- readIORef $ srvSubCount s
_srvSubDuplicate <- readIORef $ srvSubDuplicate s
_srvSubQueues <- readIORef $ srvSubQueues s
_srvSubEnd <- readIORef $ srvSubEnd s
_srvSubOk <- readIORef $ srvSubOk s
_srvSubMore <- readIORef $ srvSubMore s
_srvSubFewer <- readIORef $ srvSubFewer s
_srvSubDiff <- readIORef $ srvSubDiff s
_srvSubMoreTotal <- readIORef $ srvSubMoreTotal s
_srvSubFewerTotal <- readIORef $ srvSubFewerTotal s
pure
ServiceStatsData
{ _srvAssocNew,
_srvAssocDuplicate,
_srvAssocUpdated,
_srvAssocRemoved,
_srvSubCount,
_srvSubDuplicate,
_srvSubQueues,
_srvSubEnd,
_srvSubOk,
_srvSubMore,
_srvSubFewer,
_srvSubDiff,
_srvSubMoreTotal,
_srvSubFewerTotal
}
getResetServiceStatsData :: ServiceStats -> IO ServiceStatsData
getResetServiceStatsData s = do
_srvAssocNew <- atomicSwapIORef (srvAssocNew s) 0
_srvAssocDuplicate <- atomicSwapIORef (srvAssocDuplicate s) 0
_srvAssocUpdated <- atomicSwapIORef (srvAssocUpdated s) 0
_srvAssocRemoved <- atomicSwapIORef (srvAssocRemoved s) 0
_srvSubCount <- atomicSwapIORef (srvSubCount s) 0
_srvSubDuplicate <- atomicSwapIORef (srvSubDuplicate s) 0
_srvSubQueues <- atomicSwapIORef (srvSubQueues s) 0
_srvSubEnd <- atomicSwapIORef (srvSubEnd s) 0
_srvSubOk <- atomicSwapIORef (srvSubOk s) 0
_srvSubMore <- atomicSwapIORef (srvSubMore s) 0
_srvSubFewer <- atomicSwapIORef (srvSubFewer s) 0
_srvSubDiff <- atomicSwapIORef (srvSubDiff s) 0
_srvSubMoreTotal <- atomicSwapIORef (srvSubMoreTotal s) 0
_srvSubFewerTotal <- atomicSwapIORef (srvSubFewerTotal s) 0
pure
ServiceStatsData
{ _srvAssocNew,
_srvAssocDuplicate,
_srvAssocUpdated,
_srvAssocRemoved,
_srvSubCount,
_srvSubDuplicate,
_srvSubQueues,
_srvSubEnd,
_srvSubOk,
_srvSubMore,
_srvSubFewer,
_srvSubDiff,
_srvSubMoreTotal,
_srvSubFewerTotal
}
-- this function is not thread safe, it is used on server start only
setServiceStats :: ServiceStats -> ServiceStatsData -> IO ()
setServiceStats s d = do
writeIORef (srvAssocNew s) $! _srvAssocNew d
writeIORef (srvAssocDuplicate s) $! _srvAssocDuplicate d
writeIORef (srvAssocUpdated s) $! _srvAssocUpdated d
writeIORef (srvAssocRemoved s) $! _srvAssocRemoved d
writeIORef (srvSubCount s) $! _srvSubCount d
writeIORef (srvSubDuplicate s) $! _srvSubDuplicate d
writeIORef (srvSubQueues s) $! _srvSubQueues d
writeIORef (srvSubEnd s) $! _srvSubEnd d
writeIORef (srvSubOk s) $! _srvSubOk d
writeIORef (srvSubMore s) $! _srvSubMore d
writeIORef (srvSubFewer s) $! _srvSubFewer d
writeIORef (srvSubDiff s) $! _srvSubDiff d
writeIORef (srvSubMoreTotal s) $! _srvSubMoreTotal d
writeIORef (srvSubFewerTotal s) $! _srvSubFewerTotal d
instance StrEncoding ServiceStatsData where
strEncode ServiceStatsData {_srvAssocNew, _srvAssocDuplicate, _srvAssocUpdated, _srvAssocRemoved, _srvSubCount, _srvSubDuplicate, _srvSubQueues, _srvSubEnd} =
"assocNew="
<> strEncode _srvAssocNew
<> "\nassocDuplicate="
<> strEncode _srvAssocDuplicate
<> "\nassocUpdatedt="
<> strEncode _srvAssocUpdated
<> "\nassocRemoved="
<> strEncode _srvAssocRemoved
<> "\nsubCount="
<> strEncode _srvSubCount
<> "\nsubDuplicate="
<> strEncode _srvSubDuplicate
<> "\nsubQueues="
<> strEncode _srvSubQueues
<> "\nsubEnd="
<> strEncode _srvSubEnd
strP = do
_srvAssocNew <- "assocNew=" *> strP <* A.endOfLine
_srvAssocDuplicate <- "assocDuplicate=" *> strP <* A.endOfLine
_srvAssocUpdated <- "assocUpdatedt=" *> strP <* A.endOfLine
_srvAssocRemoved <- "assocRemoved=" *> strP <* A.endOfLine
_srvSubCount <- "subCount=" *> strP <* A.endOfLine
_srvSubDuplicate <- "subDuplicate=" *> strP <* A.endOfLine
_srvSubQueues <- "subQueues=" *> strP <* A.endOfLine
_srvSubEnd <- "subEnd=" *> strP
pure
ServiceStatsData
{ _srvAssocNew,
_srvAssocDuplicate,
_srvAssocUpdated,
_srvAssocRemoved,
_srvSubCount,
_srvSubDuplicate,
_srvSubQueues,
_srvSubEnd,
_srvSubOk = 0,
_srvSubMore = 0,
_srvSubFewer = 0,
_srvSubDiff = 0,
_srvSubMoreTotal = 0,
_srvSubFewerTotal = 0
}
data TimeBuckets = TimeBuckets
{ sumTime :: Int64,
maxTime :: Int64,
timeBuckets :: IM.IntMap Int
}
deriving (Show)
emptyTimeBuckets :: TimeBuckets
emptyTimeBuckets = TimeBuckets 0 0 IM.empty
updateTimeBuckets :: SystemSeconds -> SystemSeconds -> TimeBuckets -> TimeBuckets
updateTimeBuckets
(RoundedSystemTime deliveryTime)
(RoundedSystemTime currTime)
TimeBuckets {sumTime, maxTime, timeBuckets} =
TimeBuckets
{ sumTime = sumTime + t,
maxTime = max maxTime t,
timeBuckets = IM.alter (Just . maybe 1 (+ 1)) seconds timeBuckets
}
where
t = currTime - deliveryTime
seconds
| t <= 5 = fromIntegral t
| t <= 30 = t `toBucket` 5
| t <= 60 = t `toBucket` 10
| t <= 180 = t `toBucket` 30
| otherwise = t `toBucket` 60
toBucket n m = - fromIntegral (((- n) `div` m) * m) -- round up