mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
smp server: prometheus histogram for message confirmation times (ACK) (#1575)
* time buckets * split max time metric * histogram * histogram for confirmed delivery times * gaugehistogram * fix created, _ in gauge_histogram * remove comments * fix metrics
This commit is contained in:
@@ -684,24 +684,29 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
let threadsCount = 0
|
||||
#endif
|
||||
clientsCount <- IM.size <$> getServerClients srv
|
||||
deliveredSubs <- getDeliveredMetrics
|
||||
(deliveredSubs, deliveredTimes) <- getDeliveredMetrics =<< getSystemSeconds
|
||||
smpSubs <- getSubscribersMetrics subscribers
|
||||
ntfSubs <- getSubscribersMetrics ntfSubscribers
|
||||
loadedCounts <- loadedQueueCounts $ fromMsgStore ms
|
||||
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts}
|
||||
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts}
|
||||
where
|
||||
getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do
|
||||
subsCount <- M.size <$> getSubscribedClients queueSubscribers
|
||||
subClientsCount <- IS.size <$> readTVarIO subClients
|
||||
subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers
|
||||
pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount}
|
||||
getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0) =<< getServerClients srv
|
||||
countClnt metrics Client {subscriptions} = do
|
||||
cnt <- foldM countSubs 0 =<< readTVarIO subscriptions
|
||||
pure $ if cnt > 0
|
||||
then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}
|
||||
else metrics
|
||||
countSubs !cnt Sub {delivered} = (\empty -> if empty then cnt else cnt + 1) <$> atomically (isEmptyTMVar delivered)
|
||||
getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0, emptyTimeBuckets) =<< getServerClients srv
|
||||
where
|
||||
countClnt acc@(metrics, times) Client {subscriptions} = do
|
||||
(cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions
|
||||
pure $ if cnt > 0
|
||||
then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, times')
|
||||
else acc
|
||||
countSubs acc@(!cnt, times) Sub {delivered} = do
|
||||
delivered_ <- atomically $ tryReadTMVar delivered
|
||||
pure $ case delivered_ of
|
||||
Nothing -> acc
|
||||
Just (_, ts) -> (cnt + 1, updateTimeBuckets ts ts' times)
|
||||
|
||||
runClient :: Transport c => X.CertificateChain -> C.APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s ()
|
||||
runClient srvCert srvSignKey tp h = do
|
||||
@@ -1633,7 +1638,7 @@ client
|
||||
-- This is tracked as "subscription" in the client to prevent these
|
||||
-- clients from being able to subscribe.
|
||||
pure s
|
||||
getMessage_ :: Sub -> Maybe MsgId -> M s (Transmission BrokerMsg)
|
||||
getMessage_ :: Sub -> Maybe (MsgId, RoundedSystemTime) -> M s (Transmission BrokerMsg)
|
||||
getMessage_ s delivered_ = do
|
||||
stats <- asks serverStats
|
||||
fmap (either err id) $ liftIO $ runExceptT $
|
||||
@@ -1641,7 +1646,8 @@ client
|
||||
Just msg -> do
|
||||
let encMsg = encryptMsg qr msg
|
||||
incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats
|
||||
atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg)
|
||||
ts <- liftIO getSystemSeconds
|
||||
atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg)
|
||||
Nothing -> incStat (msgGetNoMsg stats) $> ok
|
||||
|
||||
withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg)
|
||||
@@ -1734,28 +1740,28 @@ client
|
||||
Nothing -> pure $ err NO_MSG
|
||||
Just sub ->
|
||||
atomically (getDelivered sub) >>= \case
|
||||
Just st -> do
|
||||
Just (st, ts) -> do
|
||||
stats <- asks serverStats
|
||||
fmap (either err id) $ liftIO $ runExceptT $ do
|
||||
case st of
|
||||
ProhibitSub -> do
|
||||
deletedMsg_ <- tryDelMsg ms q msgId
|
||||
liftIO $ mapM_ (updateStats stats True) deletedMsg_
|
||||
liftIO $ mapM_ (updateStats stats True ts) deletedMsg_
|
||||
pure ok
|
||||
_ -> do
|
||||
(deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId
|
||||
liftIO $ mapM_ (updateStats stats False) deletedMsg_
|
||||
liftIO $ mapM_ (updateStats stats False ts) deletedMsg_
|
||||
liftIO $ deliverMessage "ACK" qr entId sub msg_
|
||||
_ -> pure $ err NO_MSG
|
||||
where
|
||||
getDelivered :: Sub -> STM (Maybe ServerSub)
|
||||
getDelivered :: Sub -> STM (Maybe (ServerSub, RoundedSystemTime))
|
||||
getDelivered Sub {delivered, subThread} = do
|
||||
tryTakeTMVar delivered $>>= \msgId' ->
|
||||
tryTakeTMVar delivered $>>= \v@(msgId', ts) ->
|
||||
if msgId == msgId' || B.null msgId
|
||||
then pure $ Just subThread
|
||||
else putTMVar delivered msgId' $> Nothing
|
||||
updateStats :: ServerStats -> Bool -> Message -> IO ()
|
||||
updateStats stats isGet = \case
|
||||
then pure $ Just (subThread, ts)
|
||||
else putTMVar delivered v $> Nothing
|
||||
updateStats :: ServerStats -> Bool -> RoundedSystemTime -> Message -> IO ()
|
||||
updateStats stats isGet deliveryTime = \case
|
||||
MessageQuota {} -> pure ()
|
||||
Message {msgFlags} -> do
|
||||
incStat $ msgRecv stats
|
||||
@@ -1772,6 +1778,8 @@ client
|
||||
when (notification msgFlags) $ do
|
||||
incStat $ msgRecvNtf stats
|
||||
updatePeriodStats (activeQueuesNtf stats) entId
|
||||
currTime <- getSystemSeconds
|
||||
atomicModifyIORef'_ (msgRecvAckTimes stats) $ updateTimeBuckets deliveryTime currTime
|
||||
|
||||
sendMessage :: MsgFlags -> MsgBody -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
|
||||
sendMessage msgFlags msgBody q qr
|
||||
@@ -1839,33 +1847,35 @@ client
|
||||
-- the subscribed client var is read outside of STM to avoid transaction cost
|
||||
-- in case no client is subscribed.
|
||||
getSubscribedClient rId (queueSubscribers subscribers)
|
||||
$>>= atomically . deliverToSub
|
||||
$>>= deliverToSub
|
||||
>>= mapM_ forkDeliver
|
||||
where
|
||||
rId = recipientId q
|
||||
deliverToSub rcv =
|
||||
deliverToSub rcv = do
|
||||
ts <- getSystemSeconds
|
||||
atomically $
|
||||
-- reading client TVar in the same transaction,
|
||||
-- so that if subscription ends, it re-evalutates
|
||||
-- and delivery is cancelled -
|
||||
-- the new client will receive message in response to SUB.
|
||||
readTVar rcv
|
||||
$>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs
|
||||
$>>= \s@Sub {subThread, delivered} -> case subThread of
|
||||
ProhibitSub -> pure Nothing
|
||||
ServerSub st -> readTVar st >>= \case
|
||||
NoSub ->
|
||||
tryTakeTMVar delivered >>= \case
|
||||
Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
|
||||
Nothing ->
|
||||
ifM
|
||||
(isFullTBQueue sndQ')
|
||||
(writeTVar st SubPending $> Just (rc, s, st))
|
||||
(deliver sndQ' s $> Nothing)
|
||||
_ -> pure Nothing
|
||||
deliver sndQ' s = do
|
||||
readTVar rcv
|
||||
$>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs
|
||||
$>>= \s@Sub {subThread, delivered} -> case subThread of
|
||||
ProhibitSub -> pure Nothing
|
||||
ServerSub st -> readTVar st >>= \case
|
||||
NoSub ->
|
||||
tryTakeTMVar delivered >>= \case
|
||||
Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
|
||||
Nothing ->
|
||||
ifM
|
||||
(isFullTBQueue sndQ')
|
||||
(writeTVar st SubPending $> Just (rc, s, st))
|
||||
(deliver sndQ' s ts $> Nothing)
|
||||
_ -> pure Nothing
|
||||
deliver sndQ' s ts = do
|
||||
let encMsg = encryptMsg qr msg
|
||||
writeTBQueue sndQ' [(CorrId "", rId, MSG encMsg)]
|
||||
void $ setDelivered s msg
|
||||
void $ setDelivered s msg ts
|
||||
forkDeliver (rc@Client {sndQ = sndQ'}, s@Sub {delivered}, st) = do
|
||||
t <- mkWeakThreadId =<< forkIO deliverThread
|
||||
atomically $ modifyTVar' st $ \case
|
||||
@@ -1878,13 +1888,14 @@ client
|
||||
-- lookup can be outside of STM transaction,
|
||||
-- as long as the check that it is the same client is inside.
|
||||
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
|
||||
deliverIfSame rcv = time "deliver" . atomically $
|
||||
whenM (sameClient rc rcv) $
|
||||
deliverIfSame rcv = time "deliver" $ do
|
||||
ts <- getSystemSeconds
|
||||
atomically $ whenM (sameClient rc rcv) $
|
||||
tryTakeTMVar delivered >>= \case
|
||||
Just _ -> pure () -- if a message was already delivered, should not deliver more
|
||||
Nothing -> do
|
||||
-- a separate thread is needed because it blocks when client sndQ is full.
|
||||
deliver sndQ' s
|
||||
deliver sndQ' s ts
|
||||
writeTVar st NoSub
|
||||
|
||||
enqueueNotification :: NtfCreds -> Message -> M s ()
|
||||
@@ -1958,13 +1969,14 @@ client
|
||||
VRFailed e -> Left (corrId', entId', ERR e)
|
||||
|
||||
deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg)
|
||||
deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $
|
||||
deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") $
|
||||
case subThread of
|
||||
ProhibitSub -> pure resp
|
||||
_ -> case msg_ of
|
||||
Just msg ->
|
||||
Just msg -> do
|
||||
ts <- getSystemSeconds
|
||||
let encMsg = encryptMsg qr msg
|
||||
in setDelivered s msg $> (corrId, rId, MSG encMsg)
|
||||
atomically (setDelivered s msg ts) $> (corrId, rId, MSG encMsg)
|
||||
_ -> pure resp
|
||||
where
|
||||
resp = (corrId, rId, OK)
|
||||
@@ -1982,8 +1994,10 @@ client
|
||||
msgId' = messageId msg
|
||||
msgTs' = messageTs msg
|
||||
|
||||
setDelivered :: Sub -> Message -> STM Bool
|
||||
setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg
|
||||
setDelivered :: Sub -> Message -> RoundedSystemTime -> STM Bool
|
||||
setDelivered Sub {delivered} msg !ts = do
|
||||
let !msgId = messageId msg
|
||||
tryPutTMVar delivered (msgId, ts)
|
||||
|
||||
delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
|
||||
delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do
|
||||
@@ -2024,7 +2038,7 @@ client
|
||||
SubPending -> QSubPending
|
||||
SubThread _ -> QSubThread
|
||||
ProhibitSub -> pure QProhibitSub
|
||||
qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered
|
||||
qDelivered <- atomically $ decodeLatin1 . encode . fst <$$> tryReadTMVar delivered
|
||||
pure QSub {qSubThread, qDelivered}
|
||||
|
||||
ok :: Transmission BrokerMsg
|
||||
|
||||
@@ -412,7 +412,7 @@ data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)
|
||||
|
||||
data Sub = Sub
|
||||
{ subThread :: ServerSub, -- Nothing value indicates that sub
|
||||
delivered :: TMVar MsgId
|
||||
delivered :: TMVar (MsgId, RoundedSystemTime)
|
||||
}
|
||||
|
||||
newServer :: IO (Server s)
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
module Simplex.Messaging.Server.Prometheus where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.IntMap.Strict as IM
|
||||
import Data.List (mapAccumL)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (UTCTime (..), diffUTCTime)
|
||||
@@ -35,6 +37,7 @@ data RealTimeMetrics = RealTimeMetrics
|
||||
threadsCount :: Int,
|
||||
clientsCount :: Int,
|
||||
deliveredSubs :: RTSubscriberMetrics,
|
||||
deliveredTimes :: TimeBuckets,
|
||||
smpSubs :: RTSubscriberMetrics,
|
||||
ntfSubs :: RTSubscriberMetrics,
|
||||
loadedCounts :: LoadedQueueCounts
|
||||
@@ -57,6 +60,7 @@ prometheusMetrics sm rtm ts =
|
||||
threadsCount,
|
||||
clientsCount,
|
||||
deliveredSubs,
|
||||
deliveredTimes,
|
||||
smpSubs,
|
||||
ntfSubs,
|
||||
loadedCounts
|
||||
@@ -90,6 +94,7 @@ prometheusMetrics sm rtm ts =
|
||||
_msgSentLarge,
|
||||
_msgSentBlock,
|
||||
_msgRecv,
|
||||
_msgRecvAckTimes,
|
||||
_msgRecvGet,
|
||||
_msgGet,
|
||||
_msgGetNoMsg,
|
||||
@@ -436,6 +441,25 @@ prometheusMetrics sm rtm ts =
|
||||
\# TYPE simplex_smp_delivered_clients_total gauge\n\
|
||||
\simplex_smp_delivered_clients_total " <> mshow (subClientsCount deliveredSubs) <> "\n# delivered.subClientsCount\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_delivery_ack_confirmed_time Times to confirm message delivery, only confirmed deliveries\n\
|
||||
\# TYPE simplex_smp_delivery_ack_confirmed_time histogram\n\
|
||||
\simplex_smp_delivery_ack_confirmed_time_sum " <> mshow (sumTime _msgRecvAckTimes) <> "\n\
|
||||
\simplex_smp_delivery_ack_confirmed_time_count " <> mshow (_msgRecv + _msgRecvGet) <> "\n"
|
||||
<> showTimeBuckets "simplex_smp_delivery_ack_confirmed_time" (timeBuckets _msgRecvAckTimes)
|
||||
<> showTimeBucket "simplex_smp_delivery_ack_confirmed_time" "+Inf" (_msgRecv + _msgRecvGet)
|
||||
<> "\n\
|
||||
\# HELP simplex_smp_delivery_ack_confirmed_count Counts for confirmed deliveries\n\
|
||||
\# TYPE simplex_smp_delivery_ack_confirmed_count counter\n"
|
||||
<> showBucketSums "simplex_smp_delivery_ack_confirmed_count" (timeBuckets _msgRecvAckTimes)
|
||||
<> "\n\
|
||||
\# HELP simplex_smp_delivery_ack_pending_count Counts for pending delivery\n\
|
||||
\# TYPE simplex_smp_delivery_ack_pending_count gauge\n"
|
||||
<> showBucketSums "simplex_smp_delivery_ack_pending_count" (timeBuckets deliveredTimes)
|
||||
<> "\n\
|
||||
\# HELP simplex_smp_delivery_ack_time_max Max time to confirm message delivery\n\
|
||||
\# TYPE simplex_smp_delivery_ack_time_max gauge\n\
|
||||
\simplex_smp_delivery_ack_time_max " <> mshow (maxTime deliveredTimes) <> "\n# delivered.maxTime\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\
|
||||
\# TYPE simplex_smp_subscribtion_total gauge\n\
|
||||
\simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\
|
||||
@@ -480,15 +504,32 @@ prometheusMetrics sm rtm ts =
|
||||
\# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\
|
||||
\simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n"
|
||||
|
||||
showTimeBuckets :: Text -> IM.IntMap Int -> Text
|
||||
showTimeBuckets metric = T.concat . snd . mapAccumL accumBucket (0, 0) . IM.assocs
|
||||
where
|
||||
accumBucket (prevSec, total) (sec, cnt) =
|
||||
let t
|
||||
| sec - 60 > prevSec = showTimeBucket metric (tshow (sec - 60)) total
|
||||
| otherwise = ""
|
||||
in ((sec, total + cnt), t <> showTimeBucket metric (tshow sec) (total + cnt))
|
||||
showTimeBucket :: Text -> Text -> Int -> Text
|
||||
showTimeBucket metric sec count = metric <> "_bucket{le=\"" <> sec <> "\"} " <> mshow count <> "\n"
|
||||
showBucketSums :: Text -> IM.IntMap Int -> Text
|
||||
showBucketSums metric buckets = T.concat $ map showBucketSum [(0, 60), (60, 300), (300, 1200), (1200, 3600), (3600, maxBound)]
|
||||
where
|
||||
showBucketSum (minTime, maxTime) =
|
||||
metric <> "{period=\"" <> tshow minTime <> (if maxTime <= 3600 then "-" <> tshow maxTime else "+") <> "\"} " <> mshow bucketsSum <> "\n"
|
||||
where
|
||||
bucketsSum = IM.foldl' (+) 0 $ IM.filter (\sec -> minTime <= sec && sec < maxTime) buckets
|
||||
socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text
|
||||
socketsMetric sel metric descr =
|
||||
"# HELP " <> metric <> " " <> descr <> "\n"
|
||||
<> "# TYPE " <> metric <> " gauge\n"
|
||||
<> T.concat (map (\(port, ss) -> metric <> "{port=\"" <> T.pack port <> "\"} " <> mshow (sel ss) <> "\n") socketStats)
|
||||
<> "\n"
|
||||
mstr a = a <> " " <> tsEpoch
|
||||
mstr a = a <> " " <> tsEpoch ts
|
||||
mshow :: Show a => a -> Text
|
||||
mshow = mstr . tshow
|
||||
tsEpoch = tshow @Int64 $ floor @Double $ realToFrac (ts `diffUTCTime` epoch) * 1000
|
||||
tsEpoch t = tshow @Int64 $ floor @Double $ realToFrac (t `diffUTCTime` epoch) * 1000
|
||||
epoch = UTCTime systemEpochDay 0
|
||||
{-# FOURMOLU_ENABLE\n#-}
|
||||
|
||||
@@ -127,3 +127,6 @@ getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` pr
|
||||
|
||||
getSystemDate :: IO RoundedSystemTime
|
||||
getSystemDate = getRoundedSystemTime 86400
|
||||
|
||||
getSystemSeconds :: IO RoundedSystemTime
|
||||
getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime
|
||||
|
||||
@@ -14,6 +14,8 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Hashable (hash)
|
||||
import Data.IORef
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.IntMap.Strict as IM
|
||||
import Data.IntSet (IntSet)
|
||||
import qualified Data.IntSet as IS
|
||||
import Data.Set (Set)
|
||||
@@ -25,6 +27,7 @@ import Data.Time.Clock (UTCTime (..))
|
||||
import GHC.IORef (atomicSwapIORef)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (EntityId (..))
|
||||
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..))
|
||||
import Simplex.Messaging.Util (atomicModifyIORef'_, tshow, unlessM)
|
||||
|
||||
data ServerStats = ServerStats
|
||||
@@ -57,6 +60,7 @@ data ServerStats = ServerStats
|
||||
msgSentLarge :: IORef Int,
|
||||
msgSentBlock :: IORef Int,
|
||||
msgRecv :: IORef Int,
|
||||
msgRecvAckTimes :: IORef TimeBuckets,
|
||||
msgRecvGet :: IORef Int,
|
||||
msgGet :: IORef Int,
|
||||
msgGetNoMsg :: IORef Int,
|
||||
@@ -115,6 +119,7 @@ data ServerStatsData = ServerStatsData
|
||||
_msgSentLarge :: Int,
|
||||
_msgSentBlock :: Int,
|
||||
_msgRecv :: Int,
|
||||
_msgRecvAckTimes :: TimeBuckets,
|
||||
_msgRecvGet :: Int,
|
||||
_msgGet :: Int,
|
||||
_msgGetNoMsg :: Int,
|
||||
@@ -174,6 +179,7 @@ newServerStats ts = do
|
||||
msgSentLarge <- newIORef 0
|
||||
msgSentBlock <- newIORef 0
|
||||
msgRecv <- newIORef 0
|
||||
msgRecvAckTimes <- newIORef $ TimeBuckets 0 0 IM.empty
|
||||
msgRecvGet <- newIORef 0
|
||||
msgGet <- newIORef 0
|
||||
msgGetNoMsg <- newIORef 0
|
||||
@@ -230,6 +236,7 @@ newServerStats ts = do
|
||||
msgSentLarge,
|
||||
msgSentBlock,
|
||||
msgRecv,
|
||||
msgRecvAckTimes,
|
||||
msgRecvGet,
|
||||
msgGet,
|
||||
msgGetNoMsg,
|
||||
@@ -288,6 +295,7 @@ getServerStatsData s = do
|
||||
_msgSentLarge <- readIORef $ msgSentLarge s
|
||||
_msgSentBlock <- readIORef $ msgSentBlock s
|
||||
_msgRecv <- readIORef $ msgRecv s
|
||||
_msgRecvAckTimes <- readIORef $ msgRecvAckTimes s
|
||||
_msgRecvGet <- readIORef $ msgRecvGet s
|
||||
_msgGet <- readIORef $ msgGet s
|
||||
_msgGetNoMsg <- readIORef $ msgGetNoMsg s
|
||||
@@ -344,6 +352,7 @@ getServerStatsData s = do
|
||||
_msgSentLarge,
|
||||
_msgSentBlock,
|
||||
_msgRecv,
|
||||
_msgRecvAckTimes,
|
||||
_msgRecvGet,
|
||||
_msgGet,
|
||||
_msgGetNoMsg,
|
||||
@@ -403,6 +412,7 @@ setServerStats s d = do
|
||||
writeIORef (msgSentLarge s) $! _msgSentLarge d
|
||||
writeIORef (msgSentBlock s) $! _msgSentBlock d
|
||||
writeIORef (msgRecv s) $! _msgRecv d
|
||||
writeIORef (msgRecvAckTimes s) $! _msgRecvAckTimes d
|
||||
writeIORef (msgRecvGet s) $! _msgRecvGet d
|
||||
writeIORef (msgGet s) $! _msgGet d
|
||||
writeIORef (msgGetNoMsg s) $! _msgGetNoMsg d
|
||||
@@ -592,6 +602,7 @@ instance StrEncoding ServerStatsData where
|
||||
_msgSentLarge,
|
||||
_msgSentBlock,
|
||||
_msgRecv,
|
||||
_msgRecvAckTimes = emptyTimeBuckets,
|
||||
_msgRecvGet,
|
||||
_msgGet,
|
||||
_msgGetNoMsg,
|
||||
@@ -944,3 +955,33 @@ instance StrEncoding ServiceStatsData where
|
||||
_srvSubQueues,
|
||||
_srvSubEnd
|
||||
}
|
||||
|
||||
data TimeBuckets = TimeBuckets
|
||||
{ sumTime :: Int64,
|
||||
maxTime :: Int64,
|
||||
timeBuckets :: IM.IntMap Int
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
emptyTimeBuckets :: TimeBuckets
|
||||
emptyTimeBuckets = TimeBuckets 0 0 IM.empty
|
||||
|
||||
updateTimeBuckets :: RoundedSystemTime -> RoundedSystemTime -> TimeBuckets -> TimeBuckets
|
||||
updateTimeBuckets
|
||||
(RoundedSystemTime deliveryTime)
|
||||
(RoundedSystemTime currTime)
|
||||
TimeBuckets {sumTime, maxTime, timeBuckets} =
|
||||
TimeBuckets
|
||||
{ sumTime = sumTime + t,
|
||||
maxTime = max maxTime t,
|
||||
timeBuckets = IM.alter (Just . maybe 1 (+ 1)) seconds timeBuckets
|
||||
}
|
||||
where
|
||||
t = currTime - deliveryTime
|
||||
seconds
|
||||
| t <= 5 = fromIntegral t
|
||||
| t <= 30 = t `toBucket` 5
|
||||
| t <= 60 = t `toBucket` 10
|
||||
| t <= 180 = t `toBucket` 30
|
||||
| otherwise = t `toBucket` 60
|
||||
toBucket n m = - fromIntegral (((- n) `div` m) * m) -- round up
|
||||
|
||||
Reference in New Issue
Block a user