time buckets

This commit is contained in:
Evgeny Poberezkin
2025-06-18 11:28:03 +01:00
parent 1ed9f2fb8f
commit cd91808b2c
4 changed files with 85 additions and 41 deletions
+61 -40
View File
@@ -683,24 +683,38 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
let threadsCount = 0
#endif
clientsCount <- IM.size <$> getServerClients srv
deliveredSubs <- getDeliveredMetrics
(deliveredSubs, deliveredTimes) <- getDeliveredMetrics =<< getSystemSeconds
smpSubs <- getSubscribersMetrics subscribers
ntfSubs <- getSubscribersMetrics ntfSubscribers
loadedCounts <- loadedQueueCounts $ fromMsgStore ms
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts}
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts}
where
getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do
subsCount <- M.size <$> getSubscribedClients queueSubscribers
subClientsCount <- IS.size <$> readTVarIO subClients
subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers
pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount}
getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0) =<< getServerClients srv
countClnt metrics Client {subscriptions} = do
cnt <- foldM countSubs 0 =<< readTVarIO subscriptions
pure $ if cnt > 0
then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}
else metrics
countSubs !cnt Sub {delivered} = (\empty -> if empty then cnt else cnt + 1) <$> atomically (isEmptyTMVar delivered)
getDeliveredMetrics (RoundedSystemTime ts') = foldM countClnt (RTSubscriberMetrics 0 0 0, TimeAggregations 0 0 IM.empty) =<< getServerClients srv
where
countClnt acc@(metrics, times) Client {subscriptions} = do
(cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions
pure $ if cnt > 0
then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, times')
else acc
countSubs acc@(!cnt, TimeAggregations {sumTime, maxTime, minuteBuckets}) Sub {delivered} = do
delivered_ <- atomically $ tryReadTMVar delivered
pure $ case delivered_ of
Nothing -> acc
Just (_, RoundedSystemTime ts) ->
let t = ts' - ts
mins = - fromIntegral ((- t) `div` 60) -- round up
times' =
TimeAggregations
{ sumTime = sumTime + t,
maxTime = max maxTime t,
minuteBuckets = IM.alter (Just . maybe 1 (+ 1)) mins minuteBuckets
}
in (cnt + 1, times')
runClient :: Transport c => X.CertificateChain -> C.APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s ()
runClient srvCert srvSignKey tp h = do
@@ -1626,7 +1640,7 @@ client
-- This is tracked as "subscription" in the client to prevent these
-- clients from being able to subscribe.
pure s
getMessage_ :: Sub -> Maybe MsgId -> M s (Transmission BrokerMsg)
getMessage_ :: Sub -> Maybe (MsgId, RoundedSystemTime) -> M s (Transmission BrokerMsg)
getMessage_ s delivered_ = do
stats <- asks serverStats
fmap (either err id) $ liftIO $ runExceptT $
@@ -1634,7 +1648,8 @@ client
Just msg -> do
let encMsg = encryptMsg qr msg
incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats
atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg)
ts <- liftIO getSystemSeconds
atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg)
Nothing -> incStat (msgGetNoMsg stats) $> ok
withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg)
@@ -1743,10 +1758,10 @@ client
where
getDelivered :: Sub -> STM (Maybe ServerSub)
getDelivered Sub {delivered, subThread} = do
tryTakeTMVar delivered $>>= \msgId' ->
tryTakeTMVar delivered $>>= \v@(msgId', _) ->
if msgId == msgId' || B.null msgId
then pure $ Just subThread
else putTMVar delivered msgId' $> Nothing
else putTMVar delivered v $> Nothing
updateStats :: ServerStats -> Bool -> Message -> IO ()
updateStats stats isGet = \case
MessageQuota {} -> pure ()
@@ -1832,33 +1847,35 @@ client
-- the subscribed client var is read outside of STM to avoid transaction cost
-- in case no client is subscribed.
getSubscribedClient rId (queueSubscribers subscribers)
$>>= atomically . deliverToSub
$>>= deliverToSub
>>= mapM_ forkDeliver
where
rId = recipientId q
deliverToSub rcv =
deliverToSub rcv = do
ts <- getSystemSeconds
atomically $
-- reading client TVar in the same transaction,
-- so that if subscription ends, it re-evalutates
-- and delivery is cancelled -
-- the new client will receive message in response to SUB.
readTVar rcv
$>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs
$>>= \s@Sub {subThread, delivered} -> case subThread of
ProhibitSub -> pure Nothing
ServerSub st -> readTVar st >>= \case
NoSub ->
tryTakeTMVar delivered >>= \case
Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
Nothing ->
ifM
(isFullTBQueue sndQ')
(writeTVar st SubPending $> Just (rc, s, st))
(deliver sndQ' s $> Nothing)
_ -> pure Nothing
deliver sndQ' s = do
readTVar rcv
$>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs
$>>= \s@Sub {subThread, delivered} -> case subThread of
ProhibitSub -> pure Nothing
ServerSub st -> readTVar st >>= \case
NoSub ->
tryTakeTMVar delivered >>= \case
Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
Nothing ->
ifM
(isFullTBQueue sndQ')
(writeTVar st SubPending $> Just (rc, s, st))
(deliver sndQ' s ts $> Nothing)
_ -> pure Nothing
deliver sndQ' s ts = do
let encMsg = encryptMsg qr msg
writeTBQueue sndQ' [(CorrId "", rId, MSG encMsg)]
void $ setDelivered s msg
void $ setDelivered s msg ts
forkDeliver (rc@Client {sndQ = sndQ'}, s@Sub {delivered}, st) = do
t <- mkWeakThreadId =<< forkIO deliverThread
atomically $ modifyTVar' st $ \case
@@ -1871,13 +1888,14 @@ client
-- lookup can be outside of STM transaction,
-- as long as the check that it is the same client is inside.
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
deliverIfSame rcv = time "deliver" . atomically $
whenM (sameClient rc rcv) $
deliverIfSame rcv = time "deliver" $ do
ts <- getSystemSeconds
atomically $ whenM (sameClient rc rcv) $
tryTakeTMVar delivered >>= \case
Just _ -> pure () -- if a message was already delivered, should not deliver more
Nothing -> do
-- a separate thread is needed because it blocks when client sndQ is full.
deliver sndQ' s
deliver sndQ' s ts
writeTVar st NoSub
enqueueNotification :: NtfCreds -> Message -> M s ()
@@ -1952,13 +1970,14 @@ client
VRFailed e -> Left (corrId', entId', ERR e)
deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg)
deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $
deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") $
case subThread of
ProhibitSub -> pure resp
_ -> case msg_ of
Just msg ->
Just msg -> do
ts <- getSystemSeconds
let encMsg = encryptMsg qr msg
in setDelivered s msg $> (corrId, rId, MSG encMsg)
atomically (setDelivered s msg ts) $> (corrId, rId, MSG encMsg)
_ -> pure resp
where
resp = (corrId, rId, OK)
@@ -1976,8 +1995,10 @@ client
msgId' = messageId msg
msgTs' = messageTs msg
setDelivered :: Sub -> Message -> STM Bool
setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg
setDelivered :: Sub -> Message -> RoundedSystemTime -> STM Bool
setDelivered Sub {delivered} msg !ts = do
let !msgId = messageId msg
tryPutTMVar delivered (msgId, ts)
delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do
@@ -2018,7 +2039,7 @@ client
SubPending -> QSubPending
SubThread _ -> QSubThread
ProhibitSub -> pure QProhibitSub
qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered
qDelivered <- atomically $ decodeLatin1 . encode . fst <$$> tryReadTMVar delivered
pure QSub {qSubThread, qDelivered}
ok :: Transmission BrokerMsg
+1 -1
View File
@@ -409,7 +409,7 @@ data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)
data Sub = Sub
{ subThread :: ServerSub, -- Nothing value indicates that sub
delivered :: TMVar MsgId
delivered :: TMVar (MsgId, RoundedSystemTime)
}
newServer :: IO (Server s)
@@ -5,7 +5,9 @@
module Simplex.Messaging.Server.Prometheus where
import Data.Bifunctor (first)
import Data.Int (Int64)
import qualified Data.IntMap as IM
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime (..), diffUTCTime)
@@ -35,11 +37,18 @@ data RealTimeMetrics = RealTimeMetrics
threadsCount :: Int,
clientsCount :: Int,
deliveredSubs :: RTSubscriberMetrics,
deliveredTimes :: TimeAggregations,
smpSubs :: RTSubscriberMetrics,
ntfSubs :: RTSubscriberMetrics,
loadedCounts :: LoadedQueueCounts
}
data TimeAggregations = TimeAggregations
{ sumTime :: Int64,
maxTime :: Int64,
minuteBuckets :: IM.IntMap Int
}
data RTSubscriberMetrics = RTSubscriberMetrics
{ subsCount :: Int,
subClientsCount :: Int,
@@ -57,6 +66,7 @@ prometheusMetrics sm rtm ts =
threadsCount,
clientsCount,
deliveredSubs,
deliveredTimes,
smpSubs,
ntfSubs,
loadedCounts
@@ -436,6 +446,14 @@ prometheusMetrics sm rtm ts =
\# TYPE simplex_smp_delivered_clients_total gauge\n\
\simplex_smp_delivered_clients_total " <> mshow (subClientsCount deliveredSubs) <> "\n# delivered.subClientsCount\n\
\\n\
\# HELP simplex_smp_delivery_ack_time Times to confirm message delivery\n\
\# TYPE simplex_smp_delivery_ack_time histogram\n\
\simplex_smp_delivery_ack_time_max " <> mshow (maxTime deliveredTimes) <> "\n# delivered.maxTime\n\
\simplex_smp_delivery_ack_time_sum " <> mshow (sumTime deliveredTimes) <> "\n# delivered.sumTime\n\
\simplex_smp_delivery_ack_time_count " <> mshow (subsCount deliveredSubs) <> "\n# delivered.subsCount\n"
<> T.concat (map (showTimeBucket . first tshow) $ IM.assocs $ minuteBuckets deliveredTimes)
<> "simplex_smp_delivery_ack_time_bucket{le=\"+Inf\"} " <> mshow (subsCount deliveredSubs) <> "\n# delivered.minuteBuckets\n\
\\n\
\# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\
\# TYPE simplex_smp_subscribtion_total gauge\n\
\simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\
@@ -480,6 +498,8 @@ prometheusMetrics sm rtm ts =
\# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\
\simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n"
showTimeBucket :: (Text, Int) -> Text
showTimeBucket (minute, count) = "simplex_smp_delivery_ack_time_bucket{le=\"" <> minute <> "\"} " <> mshow count <> "\n# delivered.minuteBuckets\n"
socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text
socketsMetric sel metric descr =
"# HELP " <> metric <> " " <> descr <> "\n"
@@ -127,3 +127,6 @@ getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` pr
getSystemDate :: IO RoundedSystemTime
getSystemDate = getRoundedSystemTime 86400
getSystemSeconds :: IO RoundedSystemTime
getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime