diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 194cded59..03a672408 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -684,24 +684,29 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt let threadsCount = 0 #endif clientsCount <- IM.size <$> getServerClients srv - deliveredSubs <- getDeliveredMetrics + (deliveredSubs, deliveredTimes) <- getDeliveredMetrics =<< getSystemSeconds smpSubs <- getSubscribersMetrics subscribers ntfSubs <- getSubscribersMetrics ntfSubscribers loadedCounts <- loadedQueueCounts $ fromMsgStore ms - pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts} + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts} where getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do subsCount <- M.size <$> getSubscribedClients queueSubscribers subClientsCount <- IS.size <$> readTVarIO subClients subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount} - getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0) =<< getServerClients srv - countClnt metrics Client {subscriptions} = do - cnt <- foldM countSubs 0 =<< readTVarIO subscriptions - pure $ if cnt > 0 - then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1} - else metrics - countSubs !cnt Sub {delivered} = (\empty -> if empty then cnt else cnt + 1) <$> atomically (isEmptyTMVar delivered) + getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0, emptyTimeBuckets) =<< getServerClients srv + where + countClnt acc@(metrics, times) Client {subscriptions} = do + (cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions + pure $ if cnt > 0 + then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, times') + else acc + countSubs acc@(!cnt, times) Sub {delivered} = do + delivered_ <- atomically $ tryReadTMVar delivered + pure $ case delivered_ of + Nothing -> acc + Just (_, ts) -> (cnt + 1, updateTimeBuckets ts ts' times) runClient :: Transport c => X.CertificateChain -> C.APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s () runClient srvCert srvSignKey tp h = do @@ -1633,7 +1638,7 @@ client -- This is tracked as "subscription" in the client to prevent these -- clients from being able to subscribe. pure s - getMessage_ :: Sub -> Maybe MsgId -> M s (Transmission BrokerMsg) + getMessage_ :: Sub -> Maybe (MsgId, RoundedSystemTime) -> M s (Transmission BrokerMsg) getMessage_ s delivered_ = do stats <- asks serverStats fmap (either err id) $ liftIO $ runExceptT $ @@ -1641,7 +1646,8 @@ client Just msg -> do let encMsg = encryptMsg qr msg incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats - atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg) + ts <- liftIO getSystemSeconds + atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg) Nothing -> incStat (msgGetNoMsg stats) $> ok withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg) @@ -1734,28 +1740,28 @@ client Nothing -> pure $ err NO_MSG Just sub -> atomically (getDelivered sub) >>= \case - Just st -> do + Just (st, ts) -> do stats <- asks serverStats fmap (either err id) $ liftIO $ runExceptT $ do case st of ProhibitSub -> do deletedMsg_ <- tryDelMsg ms q msgId - liftIO $ mapM_ (updateStats stats True) deletedMsg_ + liftIO $ mapM_ (updateStats stats True ts) deletedMsg_ pure ok _ -> do (deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId - liftIO $ mapM_ (updateStats stats False) deletedMsg_ + liftIO $ mapM_ (updateStats stats False ts) deletedMsg_ liftIO $ deliverMessage "ACK" qr entId sub msg_ _ -> pure $ err NO_MSG where - getDelivered :: Sub -> STM (Maybe ServerSub) + getDelivered :: Sub -> STM (Maybe (ServerSub, RoundedSystemTime)) getDelivered Sub {delivered, subThread} = do - tryTakeTMVar delivered $>>= \msgId' -> + tryTakeTMVar delivered $>>= \v@(msgId', ts) -> if msgId == msgId' || B.null msgId - then pure $ Just subThread - else putTMVar delivered msgId' $> Nothing - updateStats :: ServerStats -> Bool -> Message -> IO () - updateStats stats isGet = \case + then pure $ Just (subThread, ts) + else putTMVar delivered v $> Nothing + updateStats :: ServerStats -> Bool -> RoundedSystemTime -> Message -> IO () + updateStats stats isGet deliveryTime = \case MessageQuota {} -> pure () Message {msgFlags} -> do incStat $ msgRecv stats @@ -1772,6 +1778,8 @@ client when (notification msgFlags) $ do incStat $ msgRecvNtf stats updatePeriodStats (activeQueuesNtf stats) entId + currTime <- getSystemSeconds + atomicModifyIORef'_ (msgRecvAckTimes stats) $ updateTimeBuckets deliveryTime currTime sendMessage :: MsgFlags -> MsgBody -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg) sendMessage msgFlags msgBody q qr @@ -1839,33 +1847,35 @@ client -- the subscribed client var is read outside of STM to avoid transaction cost -- in case no client is subscribed. getSubscribedClient rId (queueSubscribers subscribers) - $>>= atomically . deliverToSub + $>>= deliverToSub >>= mapM_ forkDeliver where rId = recipientId q - deliverToSub rcv = + deliverToSub rcv = do + ts <- getSystemSeconds + atomically $ -- reading client TVar in the same transaction, -- so that if subscription ends, it re-evalutates -- and delivery is cancelled - -- the new client will receive message in response to SUB. - readTVar rcv - $>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs - $>>= \s@Sub {subThread, delivered} -> case subThread of - ProhibitSub -> pure Nothing - ServerSub st -> readTVar st >>= \case - NoSub -> - tryTakeTMVar delivered >>= \case - Just _ -> pure Nothing -- if a message was already delivered, should not deliver more - Nothing -> - ifM - (isFullTBQueue sndQ') - (writeTVar st SubPending $> Just (rc, s, st)) - (deliver sndQ' s $> Nothing) - _ -> pure Nothing - deliver sndQ' s = do + readTVar rcv + $>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs + $>>= \s@Sub {subThread, delivered} -> case subThread of + ProhibitSub -> pure Nothing + ServerSub st -> readTVar st >>= \case + NoSub -> + tryTakeTMVar delivered >>= \case + Just _ -> pure Nothing -- if a message was already delivered, should not deliver more + Nothing -> + ifM + (isFullTBQueue sndQ') + (writeTVar st SubPending $> Just (rc, s, st)) + (deliver sndQ' s ts $> Nothing) + _ -> pure Nothing + deliver sndQ' s ts = do let encMsg = encryptMsg qr msg writeTBQueue sndQ' [(CorrId "", rId, MSG encMsg)] - void $ setDelivered s msg + void $ setDelivered s msg ts forkDeliver (rc@Client {sndQ = sndQ'}, s@Sub {delivered}, st) = do t <- mkWeakThreadId =<< forkIO deliverThread atomically $ modifyTVar' st $ \case @@ -1878,13 +1888,14 @@ client -- lookup can be outside of STM transaction, -- as long as the check that it is the same client is inside. getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame - deliverIfSame rcv = time "deliver" . atomically $ - whenM (sameClient rc rcv) $ + deliverIfSame rcv = time "deliver" $ do + ts <- getSystemSeconds + atomically $ whenM (sameClient rc rcv) $ tryTakeTMVar delivered >>= \case Just _ -> pure () -- if a message was already delivered, should not deliver more Nothing -> do -- a separate thread is needed because it blocks when client sndQ is full. - deliver sndQ' s + deliver sndQ' s ts writeTVar st NoSub enqueueNotification :: NtfCreds -> Message -> M s () @@ -1958,13 +1969,14 @@ client VRFailed e -> Left (corrId', entId', ERR e) deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg) - deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") . atomically $ + deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") $ case subThread of ProhibitSub -> pure resp _ -> case msg_ of - Just msg -> + Just msg -> do + ts <- getSystemSeconds let encMsg = encryptMsg qr msg - in setDelivered s msg $> (corrId, rId, MSG encMsg) + atomically (setDelivered s msg ts) $> (corrId, rId, MSG encMsg) _ -> pure resp where resp = (corrId, rId, OK) @@ -1982,8 +1994,10 @@ client msgId' = messageId msg msgTs' = messageTs msg - setDelivered :: Sub -> Message -> STM Bool - setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg + setDelivered :: Sub -> Message -> RoundedSystemTime -> STM Bool + setDelivered Sub {delivered} msg !ts = do + let !msgId = messageId msg + tryPutTMVar delivered (msgId, ts) delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg) delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do @@ -2024,7 +2038,7 @@ client SubPending -> QSubPending SubThread _ -> QSubThread ProhibitSub -> pure QProhibitSub - qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered + qDelivered <- atomically $ decodeLatin1 . encode . fst <$$> tryReadTMVar delivered pure QSub {qSubThread, qDelivered} ok :: Transmission BrokerMsg diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 627c6079a..a04a8acd8 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -412,7 +412,7 @@ data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) data Sub = Sub { subThread :: ServerSub, -- Nothing value indicates that sub - delivered :: TMVar MsgId + delivered :: TMVar (MsgId, RoundedSystemTime) } newServer :: IO (Server s) diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 5d9dcc786..cdcf10cfd 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -6,6 +6,8 @@ module Simplex.Messaging.Server.Prometheus where import Data.Int (Int64) +import qualified Data.IntMap.Strict as IM +import Data.List (mapAccumL) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock (UTCTime (..), diffUTCTime) @@ -35,6 +37,7 @@ data RealTimeMetrics = RealTimeMetrics threadsCount :: Int, clientsCount :: Int, deliveredSubs :: RTSubscriberMetrics, + deliveredTimes :: TimeBuckets, smpSubs :: RTSubscriberMetrics, ntfSubs :: RTSubscriberMetrics, loadedCounts :: LoadedQueueCounts @@ -57,6 +60,7 @@ prometheusMetrics sm rtm ts = threadsCount, clientsCount, deliveredSubs, + deliveredTimes, smpSubs, ntfSubs, loadedCounts @@ -90,6 +94,7 @@ prometheusMetrics sm rtm ts = _msgSentLarge, _msgSentBlock, _msgRecv, + _msgRecvAckTimes, _msgRecvGet, _msgGet, _msgGetNoMsg, @@ -436,6 +441,25 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_delivered_clients_total gauge\n\ \simplex_smp_delivered_clients_total " <> mshow (subClientsCount deliveredSubs) <> "\n# delivered.subClientsCount\n\ \\n\ + \# HELP simplex_smp_delivery_ack_confirmed_time Times to confirm message delivery, only confirmed deliveries\n\ + \# TYPE simplex_smp_delivery_ack_confirmed_time histogram\n\ + \simplex_smp_delivery_ack_confirmed_time_sum " <> mshow (sumTime _msgRecvAckTimes) <> "\n\ + \simplex_smp_delivery_ack_confirmed_time_count " <> mshow (_msgRecv + _msgRecvGet) <> "\n" + <> showTimeBuckets "simplex_smp_delivery_ack_confirmed_time" (timeBuckets _msgRecvAckTimes) + <> showTimeBucket "simplex_smp_delivery_ack_confirmed_time" "+Inf" (_msgRecv + _msgRecvGet) + <> "\n\ + \# HELP simplex_smp_delivery_ack_confirmed_count Counts for confirmed deliveries\n\ + \# TYPE simplex_smp_delivery_ack_confirmed_count counter\n" + <> showBucketSums "simplex_smp_delivery_ack_confirmed_count" (timeBuckets _msgRecvAckTimes) + <> "\n\ + \# HELP simplex_smp_delivery_ack_pending_count Counts for pending delivery\n\ + \# TYPE simplex_smp_delivery_ack_pending_count gauge\n" + <> showBucketSums "simplex_smp_delivery_ack_pending_count" (timeBuckets deliveredTimes) + <> "\n\ + \# HELP simplex_smp_delivery_ack_time_max Max time to confirm message delivery\n\ + \# TYPE simplex_smp_delivery_ack_time_max gauge\n\ + \simplex_smp_delivery_ack_time_max " <> mshow (maxTime deliveredTimes) <> "\n# delivered.maxTime\n\ + \\n\ \# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\ \# TYPE simplex_smp_subscribtion_total gauge\n\ \simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\ @@ -480,15 +504,32 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\ \simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n" + showTimeBuckets :: Text -> IM.IntMap Int -> Text + showTimeBuckets metric = T.concat . snd . mapAccumL accumBucket (0, 0) . IM.assocs + where + accumBucket (prevSec, total) (sec, cnt) = + let t + | sec - 60 > prevSec = showTimeBucket metric (tshow (sec - 60)) total + | otherwise = "" + in ((sec, total + cnt), t <> showTimeBucket metric (tshow sec) (total + cnt)) + showTimeBucket :: Text -> Text -> Int -> Text + showTimeBucket metric sec count = metric <> "_bucket{le=\"" <> sec <> "\"} " <> mshow count <> "\n" + showBucketSums :: Text -> IM.IntMap Int -> Text + showBucketSums metric buckets = T.concat $ map showBucketSum [(0, 60), (60, 300), (300, 1200), (1200, 3600), (3600, maxBound)] + where + showBucketSum (minTime, maxTime) = + metric <> "{period=\"" <> tshow minTime <> (if maxTime <= 3600 then "-" <> tshow maxTime else "+") <> "\"} " <> mshow bucketsSum <> "\n" + where + bucketsSum = IM.foldl' (+) 0 $ IM.filter (\sec -> minTime <= sec && sec < maxTime) buckets socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text socketsMetric sel metric descr = "# HELP " <> metric <> " " <> descr <> "\n" <> "# TYPE " <> metric <> " gauge\n" <> T.concat (map (\(port, ss) -> metric <> "{port=\"" <> T.pack port <> "\"} " <> mshow (sel ss) <> "\n") socketStats) <> "\n" - mstr a = a <> " " <> tsEpoch + mstr a = a <> " " <> tsEpoch ts mshow :: Show a => a -> Text mshow = mstr . tshow - tsEpoch = tshow @Int64 $ floor @Double $ realToFrac (ts `diffUTCTime` epoch) * 1000 + tsEpoch t = tshow @Int64 $ floor @Double $ realToFrac (t `diffUTCTime` epoch) * 1000 epoch = UTCTime systemEpochDay 0 {-# FOURMOLU_ENABLE\n#-} diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index cbac2bf08..9395f5bac 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -127,3 +127,6 @@ getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` pr getSystemDate :: IO RoundedSystemTime getSystemDate = getRoundedSystemTime 86400 + +getSystemSeconds :: IO RoundedSystemTime +getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index da90b7216..50c611009 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -14,6 +14,8 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Hashable (hash) import Data.IORef +import Data.Int (Int64) +import qualified Data.IntMap.Strict as IM import Data.IntSet (IntSet) import qualified Data.IntSet as IS import Data.Set (Set) @@ -25,6 +27,7 @@ import Data.Time.Clock (UTCTime (..)) import GHC.IORef (atomicSwapIORef) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (EntityId (..)) +import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..)) import Simplex.Messaging.Util (atomicModifyIORef'_, tshow, unlessM) data ServerStats = ServerStats @@ -57,6 +60,7 @@ data ServerStats = ServerStats msgSentLarge :: IORef Int, msgSentBlock :: IORef Int, msgRecv :: IORef Int, + msgRecvAckTimes :: IORef TimeBuckets, msgRecvGet :: IORef Int, msgGet :: IORef Int, msgGetNoMsg :: IORef Int, @@ -115,6 +119,7 @@ data ServerStatsData = ServerStatsData _msgSentLarge :: Int, _msgSentBlock :: Int, _msgRecv :: Int, + _msgRecvAckTimes :: TimeBuckets, _msgRecvGet :: Int, _msgGet :: Int, _msgGetNoMsg :: Int, @@ -174,6 +179,7 @@ newServerStats ts = do msgSentLarge <- newIORef 0 msgSentBlock <- newIORef 0 msgRecv <- newIORef 0 + msgRecvAckTimes <- newIORef $ TimeBuckets 0 0 IM.empty msgRecvGet <- newIORef 0 msgGet <- newIORef 0 msgGetNoMsg <- newIORef 0 @@ -230,6 +236,7 @@ newServerStats ts = do msgSentLarge, msgSentBlock, msgRecv, + msgRecvAckTimes, msgRecvGet, msgGet, msgGetNoMsg, @@ -288,6 +295,7 @@ getServerStatsData s = do _msgSentLarge <- readIORef $ msgSentLarge s _msgSentBlock <- readIORef $ msgSentBlock s _msgRecv <- readIORef $ msgRecv s + _msgRecvAckTimes <- readIORef $ msgRecvAckTimes s _msgRecvGet <- readIORef $ msgRecvGet s _msgGet <- readIORef $ msgGet s _msgGetNoMsg <- readIORef $ msgGetNoMsg s @@ -344,6 +352,7 @@ getServerStatsData s = do _msgSentLarge, _msgSentBlock, _msgRecv, + _msgRecvAckTimes, _msgRecvGet, _msgGet, _msgGetNoMsg, @@ -403,6 +412,7 @@ setServerStats s d = do writeIORef (msgSentLarge s) $! _msgSentLarge d writeIORef (msgSentBlock s) $! _msgSentBlock d writeIORef (msgRecv s) $! _msgRecv d + writeIORef (msgRecvAckTimes s) $! _msgRecvAckTimes d writeIORef (msgRecvGet s) $! _msgRecvGet d writeIORef (msgGet s) $! _msgGet d writeIORef (msgGetNoMsg s) $! _msgGetNoMsg d @@ -592,6 +602,7 @@ instance StrEncoding ServerStatsData where _msgSentLarge, _msgSentBlock, _msgRecv, + _msgRecvAckTimes = emptyTimeBuckets, _msgRecvGet, _msgGet, _msgGetNoMsg, @@ -944,3 +955,33 @@ instance StrEncoding ServiceStatsData where _srvSubQueues, _srvSubEnd } + +data TimeBuckets = TimeBuckets + { sumTime :: Int64, + maxTime :: Int64, + timeBuckets :: IM.IntMap Int + } + deriving (Show) + +emptyTimeBuckets :: TimeBuckets +emptyTimeBuckets = TimeBuckets 0 0 IM.empty + +updateTimeBuckets :: RoundedSystemTime -> RoundedSystemTime -> TimeBuckets -> TimeBuckets +updateTimeBuckets + (RoundedSystemTime deliveryTime) + (RoundedSystemTime currTime) + TimeBuckets {sumTime, maxTime, timeBuckets} = + TimeBuckets + { sumTime = sumTime + t, + maxTime = max maxTime t, + timeBuckets = IM.alter (Just . maybe 1 (+ 1)) seconds timeBuckets + } + where + t = currTime - deliveryTime + seconds + | t <= 5 = fromIntegral t + | t <= 30 = t `toBucket` 5 + | t <= 60 = t `toBucket` 10 + | t <= 180 = t `toBucket` 30 + | otherwise = t `toBucket` 60 + toBucket n m = - fromIntegral (((- n) `div` m) * m) -- round up