diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a5d86ab7f..0754b4431 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -683,24 +683,38 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt let threadsCount = 0 #endif clientsCount <- IM.size <$> getServerClients srv - deliveredSubs <- getDeliveredMetrics + (deliveredSubs, deliveredTimes) <- getDeliveredMetrics =<< getSystemSeconds smpSubs <- getSubscribersMetrics subscribers ntfSubs <- getSubscribersMetrics ntfSubscribers loadedCounts <- loadedQueueCounts $ fromMsgStore ms - pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts} + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts} where getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do subsCount <- M.size <$> getSubscribedClients queueSubscribers subClientsCount <- IS.size <$> readTVarIO subClients subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount} - getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0) =<< getServerClients srv - countClnt metrics Client {subscriptions} = do - cnt <- foldM countSubs 0 =<< readTVarIO subscriptions - pure $ if cnt > 0 - then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1} - else metrics - countSubs !cnt Sub {delivered} = (\empty -> if empty then cnt else cnt + 1) <$> atomically (isEmptyTMVar delivered) + getDeliveredMetrics (RoundedSystemTime ts') = foldM countClnt (RTSubscriberMetrics 0 0 0, TimeAggregations 0 0 IM.empty) =<< getServerClients srv + where + countClnt acc@(metrics, times) Client {subscriptions} = do + (cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions + pure $ if cnt > 0 + then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, times') + else acc + countSubs acc@(!cnt, TimeAggregations {sumTime, maxTime, minuteBuckets}) Sub {delivered} = do + delivered_ <- atomically $ tryReadTMVar delivered + pure $ case delivered_ of + Nothing -> acc + Just (_, RoundedSystemTime ts) -> + let t = ts' - ts + mins = - fromIntegral ((- t) `div` 60) -- round up + times' = + TimeAggregations + { sumTime = sumTime + t, + maxTime = max maxTime t, + minuteBuckets = IM.alter (Just . maybe 1 (+ 1)) mins minuteBuckets + } + in (cnt + 1, times') runClient :: Transport c => X.CertificateChain -> C.APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s () runClient srvCert srvSignKey tp h = do @@ -1626,7 +1640,7 @@ client -- This is tracked as "subscription" in the client to prevent these -- clients from being able to subscribe. pure s - getMessage_ :: Sub -> Maybe MsgId -> M s (Transmission BrokerMsg) + getMessage_ :: Sub -> Maybe (MsgId, RoundedSystemTime) -> M s (Transmission BrokerMsg) getMessage_ s delivered_ = do stats <- asks serverStats fmap (either err id) $ liftIO $ runExceptT $ @@ -1634,7 +1648,8 @@ client Just msg -> do let encMsg = encryptMsg qr msg incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats - atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg) + ts <- liftIO getSystemSeconds + atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg) Nothing -> incStat (msgGetNoMsg stats) $> ok withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg) @@ -1743,10 +1758,10 @@ client where getDelivered :: Sub -> STM (Maybe ServerSub) getDelivered Sub {delivered, subThread} = do - tryTakeTMVar delivered $>>= \msgId' -> + tryTakeTMVar delivered $>>= \v@(msgId', _) -> if msgId == msgId' || B.null msgId then pure $ Just subThread - else putTMVar delivered msgId' $> Nothing + else putTMVar delivered v $> Nothing updateStats :: ServerStats -> Bool -> Message -> IO () updateStats stats isGet = \case MessageQuota {} -> pure () @@ -1832,33 +1847,35 @@ client -- the subscribed client var is read outside of STM to avoid transaction cost -- in case no client is subscribed. getSubscribedClient rId (queueSubscribers subscribers) - $>>= atomically . deliverToSub + $>>= deliverToSub >>= mapM_ forkDeliver where rId = recipientId q - deliverToSub rcv = + deliverToSub rcv = do + ts <- getSystemSeconds + atomically $ -- reading client TVar in the same transaction, -- so that if subscription ends, it re-evalutates -- and delivery is cancelled - -- the new client will receive message in response to SUB. - readTVar rcv - $>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs - $>>= \s@Sub {subThread, delivered} -> case subThread of - ProhibitSub -> pure Nothing - ServerSub st -> readTVar st >>= \case - NoSub -> - tryTakeTMVar delivered >>= \case - Just _ -> pure Nothing -- if a message was already delivered, should not deliver more - Nothing -> - ifM - (isFullTBQueue sndQ') - (writeTVar st SubPending $> Just (rc, s, st)) - (deliver sndQ' s $> Nothing) - _ -> pure Nothing - deliver sndQ' s = do + readTVar rcv + $>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs + $>>= \s@Sub {subThread, delivered} -> case subThread of + ProhibitSub -> pure Nothing + ServerSub st -> readTVar st >>= \case + NoSub -> + tryTakeTMVar delivered >>= \case + Just _ -> pure Nothing -- if a message was already delivered, should not deliver more + Nothing -> + ifM + (isFullTBQueue sndQ') + (writeTVar st SubPending $> Just (rc, s, st)) + (deliver sndQ' s ts $> Nothing) + _ -> pure Nothing + deliver sndQ' s ts = do let encMsg = encryptMsg qr msg writeTBQueue sndQ' [(CorrId "", rId, MSG encMsg)] - void $ setDelivered s msg + void $ setDelivered s msg ts forkDeliver (rc@Client {sndQ = sndQ'}, s@Sub {delivered}, st) = do t <- mkWeakThreadId =<< forkIO deliverThread atomically $ modifyTVar' st $ \case @@ -1871,13 +1888,14 @@ client -- lookup can be outside of STM transaction, -- as long as the check that it is the same client is inside. getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame - deliverIfSame rcv = time "deliver" . atomically $ - whenM (sameClient rc rcv) $ + deliverIfSame rcv = time "deliver" $ do + ts <- getSystemSeconds + atomically $ whenM (sameClient rc rcv) $ tryTakeTMVar delivered >>= \case Just _ -> pure () -- if a message was already delivered, should not deliver more Nothing -> do -- a separate thread is needed because it blocks when client sndQ is full. - deliver sndQ' s + deliver sndQ' s ts writeTVar st NoSub enqueueNotification :: NtfCreds -> Message -> M s () @@ -1952,13 +1970,14 @@ client VRFailed e -> Left (corrId', entId', ERR e) deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg) - deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $ + deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") $ case subThread of ProhibitSub -> pure resp _ -> case msg_ of - Just msg -> + Just msg -> do + ts <- getSystemSeconds let encMsg = encryptMsg qr msg - in setDelivered s msg $> (corrId, rId, MSG encMsg) + atomically (setDelivered s msg ts) $> (corrId, rId, MSG encMsg) _ -> pure resp where resp = (corrId, rId, OK) @@ -1976,8 +1995,10 @@ client msgId' = messageId msg msgTs' = messageTs msg - setDelivered :: Sub -> Message -> STM Bool - setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg + setDelivered :: Sub -> Message -> RoundedSystemTime -> STM Bool + setDelivered Sub {delivered} msg !ts = do + let !msgId = messageId msg + tryPutTMVar delivered (msgId, ts) delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg) delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do @@ -2018,7 +2039,7 @@ client SubPending -> QSubPending SubThread _ -> QSubThread ProhibitSub -> pure QProhibitSub - qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered + qDelivered <- atomically $ decodeLatin1 . encode . fst <$$> tryReadTMVar delivered pure QSub {qSubThread, qDelivered} ok :: Transmission BrokerMsg diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 7819c297e..25940ac32 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -409,7 +409,7 @@ data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) data Sub = Sub { subThread :: ServerSub, -- Nothing value indicates that sub - delivered :: TMVar MsgId + delivered :: TMVar (MsgId, RoundedSystemTime) } newServer :: IO (Server s) diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 5d9dcc786..2324f768c 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -5,7 +5,9 @@ module Simplex.Messaging.Server.Prometheus where +import Data.Bifunctor (first) import Data.Int (Int64) +import qualified Data.IntMap as IM import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock (UTCTime (..), diffUTCTime) @@ -35,11 +37,18 @@ data RealTimeMetrics = RealTimeMetrics threadsCount :: Int, clientsCount :: Int, deliveredSubs :: RTSubscriberMetrics, + deliveredTimes :: TimeAggregations, smpSubs :: RTSubscriberMetrics, ntfSubs :: RTSubscriberMetrics, loadedCounts :: LoadedQueueCounts } +data TimeAggregations = TimeAggregations + { sumTime :: Int64, + maxTime :: Int64, + minuteBuckets :: IM.IntMap Int + } + data RTSubscriberMetrics = RTSubscriberMetrics { subsCount :: Int, subClientsCount :: Int, @@ -57,6 +66,7 @@ prometheusMetrics sm rtm ts = threadsCount, clientsCount, deliveredSubs, + deliveredTimes, smpSubs, ntfSubs, loadedCounts @@ -436,6 +446,14 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_delivered_clients_total gauge\n\ \simplex_smp_delivered_clients_total " <> mshow (subClientsCount deliveredSubs) <> "\n# delivered.subClientsCount\n\ \\n\ + \# HELP simplex_smp_delivery_ack_time Times to confirm message delivery\n\ + \# TYPE simplex_smp_delivery_ack_time histogram\n\ + \simplex_smp_delivery_ack_time_max " <> mshow (maxTime deliveredTimes) <> "\n# delivered.maxTime\n\ + \simplex_smp_delivery_ack_time_sum " <> mshow (sumTime deliveredTimes) <> "\n# delivered.sumTime\n\ + \simplex_smp_delivery_ack_time_count " <> mshow (subsCount deliveredSubs) <> "\n# delivered.subsCount\n" + <> T.concat (map (showTimeBucket . first tshow) $ IM.assocs $ minuteBuckets deliveredTimes) + <> "simplex_smp_delivery_ack_time_bucket{le=\"+Inf\"} " <> mshow (subsCount deliveredSubs) <> "\n# delivered.minuteBuckets\n\ + \\n\ \# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\ \# TYPE simplex_smp_subscribtion_total gauge\n\ \simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\ @@ -480,6 +498,8 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\ \simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n" + showTimeBucket :: (Text, Int) -> Text + showTimeBucket (minute, count) = "simplex_smp_delivery_ack_time_bucket{le=\"" <> minute <> "\"} " <> mshow count <> "\n# delivered.minuteBuckets\n" socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text socketsMetric sel metric descr = "# HELP " <> metric <> " " <> descr <> "\n" diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index cbac2bf08..9395f5bac 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -127,3 +127,6 @@ getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` pr getSystemDate :: IO RoundedSystemTime getSystemDate = getRoundedSystemTime 86400 + +getSystemSeconds :: IO RoundedSystemTime +getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime