From 46ff37c362050c8b933b89fe4c12d0e1ffc71f60 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 9 Jun 2025 14:14:27 +0100 Subject: [PATCH] ntf server: additional statistics (#1558) * ntf server: additional statistics * version * fix stats * add stats to track notifications without active token * refactor * fix stats parser * version --- src/Simplex/Messaging/Client/Agent.hs | 1 - src/Simplex/Messaging/Encoding/String.hs | 6 +- src/Simplex/Messaging/Notifications/Server.hs | 41 +++++---- .../Messaging/Notifications/Server/Env.hs | 2 +- .../Notifications/Server/Prometheus.hs | 58 +++++++++---- .../Messaging/Notifications/Server/Stats.hs | 83 +++++++++++++++++++ 6 files changed, 158 insertions(+), 33 deletions(-) diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 9c2238551..b63b69e32 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -74,7 +74,6 @@ import Simplex.Messaging.Protocol subscriberParty, subscriberServiceRole ) -import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Session import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index 922ff2266..d254aaaa6 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -43,7 +43,7 @@ import qualified Data.X509 as X import qualified Data.X509.Validation as XV import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (parseAll) -import Simplex.Messaging.Util (bshow, (<$?>)) +import Simplex.Messaging.Util (bshow, safeDecodeUtf8, (<$?>)) class TextEncoding a where textEncode :: a -> Text @@ -91,6 +91,10 @@ instance StrEncoding String where strEncode = strEncode . B.pack strP = B.unpack <$> strP +instance StrEncoding Text where + strEncode = encodeUtf8 + strP = safeDecodeUtf8 <$> A.takeTill (\c -> c == ' ' || c == '\n') + instance ToJSON Str where toJSON (Str s) = strToJSON s toEncoding (Str s) = strToJEncoding s diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 7b325e10d..98f83f4d3 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -281,7 +281,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} getSMPServiceSubMetrics a sel subQueueCount = getSubMetrics_ a sel countSubs where countSubs :: (NtfSMPSubMetrics, S.Set Text) -> (SMPServer, TVar (Maybe sub)) -> IO (NtfSMPSubMetrics, S.Set Text) - countSubs acc (srv, serviceSubs) = maybe acc (subMetricsResult a acc srv . fromIntegral . subQueueCount) <$> readTVarIO serviceSubs + countSubs acc (srv, serviceSubs) = subMetricsResult a acc srv . fromIntegral . maybe 0 subQueueCount <$> readTVarIO serviceSubs getSMPSubMetrics :: SMPClientAgent 'Notifier -> (SMPClientAgent 'Notifier -> TMap SMPServer (TMap NotifierId a)) -> IO NtfSMPSubMetrics getSMPSubMetrics a sel = getSubMetrics_ a sel countSubs @@ -305,11 +305,11 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} | isOwnServer a srv = let !ownSrvSubs' = M.alter (Just . maybe cnt (+ cnt)) host ownSrvSubs metrics' = metrics {ownSrvSubs = ownSrvSubs'} :: NtfSMPSubMetrics - in (metrics', otherSrvs) + in (metrics', otherSrvs) | cnt == 0 = acc | otherwise = let metrics' = metrics {otherSrvSubCount = otherSrvSubCount + cnt} :: NtfSMPSubMetrics - in (metrics', S.insert host otherSrvs) + in (metrics', S.insert host otherSrvs) where NtfSMPSubMetrics {ownSrvSubs, otherSrvSubCount} = metrics host = safeDecodeUtf8 $ strEncode h @@ -527,7 +527,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = NtfPushServer {pushQ} <- asks pushServer stats <- asks serverStats liftIO $ forever $ do - ((_, srv, _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ + ((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ forM ts $ \(ntfId, t) -> case t of STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen STResponse {} -> pure () -- it was already reported as timeout error @@ -538,9 +538,16 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = ntfTs <- getSystemTime updatePeriodStats (activeSubs stats) ntfId let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} - ntfs_ <- addTokenLastNtf st newNtf - forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs) - incNtfStat_ stats ntfReceived + srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing + addTokenLastNtf st newNtf >>= \case + Right (tkn, lastNtfs) -> do + atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs) + incNtfStat_ stats ntfReceived + mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_ + Left AUTH -> do + incNtfStat_ stats ntfReceivedAuth + mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_ + Left _ -> pure () Right SMP.END -> whenM (atomically $ activeClientSession' ca sessionId srv) $ void $ updateSrvSubStatus st smpQueue NSEnd @@ -625,7 +632,7 @@ showServer' = decodeLatin1 . strEncode . host ntfPush :: NtfPushServer -> M () ntfPush s@NtfPushServer {pushQ} = forever $ do - (tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ) + (srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ) liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) st <- asks store case ntf of @@ -644,8 +651,14 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do PNMessage {} -> checkActiveTkn tknStatus $ do stats <- asks serverStats liftIO $ updatePeriodStats (activeTokens stats) ntfTknId - liftIO (deliverNotification st pp tkn ntf) - >>= incNtfStatT t . (\case Left _ -> ntfFailed; Right () -> ntfDelivered) + liftIO (deliverNotification st pp tkn ntf) >>= \case + Left _ -> do + incNtfStatT t ntfFailed + liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ + Right () -> do + incNtfStatT t ntfDelivered + liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ + where checkActiveTkn :: NtfTknStatus -> M () -> M () checkActiveTkn status action @@ -686,7 +699,7 @@ periodicNtfsThread NtfPushServer {pushQ} = do liftIO $ forever $ do threadDelay interval now <- systemSeconds <$> getSystemTime - cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (tkn, PNCheckMessages) + cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages) logNote $ "Scheduled periodic notifications: " <> tshow cnt runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M () @@ -794,7 +807,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ ts <- liftIO $ getSystemDate let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts withNtfStore (`addNtfToken` tkn) $ \_ -> do - atomically $ writeTBQueue pushQ (tkn, PNVerification regCode) + atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode) incNtfStatT token ntfVrfQueued incNtfStatT token tknCreated pure $ NRTknId tknId srvDhPubKey @@ -810,7 +823,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ | otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification where sendVerification = do - atomically $ writeTBQueue pushQ (tkn, PNVerification tknRegCode) + atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode) incNtfStatT token ntfVrfQueued pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey TVFY code -- this allows repeated verification for cases when client connection dropped before server response @@ -828,7 +841,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ regCode <- getRegCode let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode} withNtfStore (`replaceNtfToken` tkn') $ \_ -> do - atomically $ writeTBQueue pushQ (tkn', PNVerification regCode) + atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode) incNtfStatT token ntfVrfQueued incNtfStatT token tknReplaced pure NROk diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index e99350c7d..da5ca3ffb 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -146,7 +146,7 @@ data SMPSubscriber = SMPSubscriber } data NtfPushServer = NtfPushServer - { pushQ :: TBQueue (NtfTknRec, PushNotification), + { pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server pushClients :: TMap PushProvider PushProviderClient, apnsConfig :: APNSPushClientConfig } diff --git a/src/Simplex/Messaging/Notifications/Server/Prometheus.hs b/src/Simplex/Messaging/Notifications/Server/Prometheus.hs index faaa56951..b86eed685 100644 --- a/src/Simplex/Messaging/Notifications/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Notifications/Server/Prometheus.hs @@ -76,8 +76,13 @@ ntfPrometheusMetrics sm rtm ts = _subCreated, _subDeleted, _ntfReceived, + _ntfReceivedAuth, _ntfDelivered, _ntfFailed, + _ntfReceivedOwn, + _ntfReceivedAuthOwn, + _ntfDeliveredOwn, + _ntfFailedOwn, _ntfCronDelivered, _ntfCronFailed, _ntfVrfQueued, @@ -165,6 +170,10 @@ ntfPrometheusMetrics sm rtm ts = \# TYPE simplex_ntf_notifications_received counter\n\ \simplex_ntf_notifications_received " <> mshow _ntfReceived <> "\n# ntfReceived\n\ \\n\ + \# HELP simplex_ntf_notifications_received_auth Received notifications without token or subscription (AUTH error)\n\ + \# TYPE simplex_ntf_notifications_received_auth counter\n\ + \simplex_ntf_notifications_received_auth " <> mshow _ntfReceivedAuth <> "\n# ntfReceivedAuth\n\ + \\n\ \# HELP simplex_ntf_notifications_delivered Delivered notifications\n\ \# TYPE simplex_ntf_notifications_delivered counter\n\ \simplex_ntf_notifications_delivered " <> mshow _ntfDelivered <> "\n# ntfDelivered\n\ @@ -201,6 +210,10 @@ ntfPrometheusMetrics sm rtm ts = \# TYPE simplex_ntf_notifications_total gauge\n\ \simplex_ntf_notifications_total " <> mshow lastNtfCount <> "\n# lastNtfCount\n\ \\n" + <> showNtfsByServer _ntfReceivedOwn "simplex_ntf_notifications_received_own" "Received notifications" "ntfReceivedOwn" + <> showNtfsByServer _ntfReceivedAuthOwn "simplex_ntf_notifications_received_auth_own" "Received notifications without token or subscription (AUTH error)" "ntfReceivedAuthOwn" + <> showNtfsByServer _ntfDeliveredOwn "simplex_ntf_notifications_delivered_own" "Delivered notifications" "ntfDeliveredOwn" + <> showNtfsByServer _ntfFailedOwn "simplex_ntf_notifications_failed_own" "Failed notifications" "ntfFailedOwn" info = "# Info\n\ \# ----\n\ @@ -228,28 +241,41 @@ ntfPrometheusMetrics sm rtm ts = showOwnSrvSubs <> showOtherSrvSubs where showOwnSrvSubs - | M.null ownSrvSubs = showOwn_ "" 0 0 - | otherwise = T.concat $ map (\(host, cnt) -> showOwn_ (metricHost host) 1 cnt) $ M.assocs ownSrvSubs - showOwn_ param srvCnt subCnt = - gaugeMetric (mPfx <> "server_count_own") param srvCnt (descrPfx <> " SMP subscriptions, own server count") "ownSrvSubs server" - <> gaugeMetric (mPfx <> "sub_count_own") param subCnt (descrPfx <> " SMP subscriptions count for own servers") "ownSrvSubs count" + | M.null ownSrvSubs = "" + | otherwise = + gaugeMetrics (mPfx <> "server_count_own") srvMetrics (descrPfx <> " SMP subscriptions, own server count") "ownSrvSubs server" + <> gaugeMetrics (mPfx <> "sub_count_own") subMetrics (descrPfx <> " SMP subscriptions count for own servers") "ownSrvSubs count" + where + subs = M.assocs ownSrvSubs + srvMetrics = map (\(host, _) -> (metricHost host, 1)) subs + subMetrics = map (\(host, cnt) -> (metricHost host, cnt)) subs showOtherSrvSubs = - gaugeMetric (mPfx <> "server_count_other") "" otherServers (descrPfx <> " SMP subscriptions, other server count") "otherServers" - <> gaugeMetric (mPfx <> "sub_count_other") "" otherSrvSubCount (descrPfx <> " SMP subscriptions count for other servers") "otherSrvSubCount" + gaugeMetrics (mPfx <> "server_count_other") [("", otherServers)] (descrPfx <> " SMP subscriptions, other server count") "otherServers" + <> gaugeMetrics (mPfx <> "sub_count_other") [("", otherSrvSubCount)] (descrPfx <> " SMP subscriptions count for other servers") "otherSrvSubCount" + showNtfsByServer (StatsByServerData srvNtfs) mName descr varName + | null srvNtfs = "" + | otherwise = + "# HELP " <> mName <> " " <> descr <> "\n\ + \# TYPE " <> mName <> " counter\n" + <> showNtfMetrics + <> "# " <> varName <> "\n\n" + where + showNtfMetrics = T.concat $ map (\(host, value) -> mName <> metricHost host <> " " <> mshow value <> "\n") srvNtfs showWorkerMetric NtfSMPWorkerMetrics {ownServers, otherServers} mPfx descrPfx = showOwnServers <> showOtherServers where showOwnServers - | null ownServers = showOwn_ "" 0 - | otherwise = T.concat $ map (\host -> showOwn_ (metricHost host) 1) ownServers - showOwn_ param cnt = gaugeMetric (mPfx <> "count_own") param cnt (descrPfx <> " count for own servers") "ownServers" - showOtherServers = gaugeMetric (mPfx <> "count_other") "" otherServers (descrPfx <> " count for other servers") "otherServers" - gaugeMetric :: Text -> Text -> Int -> Text -> Text -> Text - gaugeMetric name param value descr codeRef = + | null ownServers = "" + | otherwise = gaugeMetrics (mPfx <> "count_own") subMetrics (descrPfx <> " count for own servers") "ownServers" + where + subMetrics = map (\host -> (metricHost host, 1)) ownServers + showOtherServers = gaugeMetrics (mPfx <> "count_other") [("", otherServers)] (descrPfx <> " count for other servers") "otherServers" + gaugeMetrics :: Text -> [(Text, Int)] -> Text -> Text -> Text + gaugeMetrics name subMetrics descr codeRef = "# HELP " <> name <> " " <> descr <> "\n\ - \# TYPE " <> name <> " gauge\n\ - \" <> name <> param <> " " <> mshow value <> "\n# " <> codeRef <> "\n\ - \\n" + \# TYPE " <> name <> " gauge\n" + <> T.concat (map (\(param, value) -> name <> param <> " " <> mshow value <> "\n") subMetrics) + <> "# " <> codeRef <> "\n\n" metricHost host = "{server=\"" <> host <> "\"}" mstr a = a <> " " <> tsEpoch mshow :: Show a => a -> Text diff --git a/src/Simplex/Messaging/Notifications/Server/Stats.hs b/src/Simplex/Messaging/Notifications/Server/Stats.hs index eeff48c76..a20e41c34 100644 --- a/src/Simplex/Messaging/Notifications/Server/Stats.hs +++ b/src/Simplex/Messaging/Notifications/Server/Stats.hs @@ -5,12 +5,17 @@ module Simplex.Messaging.Notifications.Server.Stats where import Control.Applicative (optional, (<|>)) +import Control.Concurrent.STM import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B import Data.IORef +import qualified Data.Map.Strict as M +import Data.Text (Text) import Data.Time.Clock (UTCTime) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Server.Stats +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM data NtfServerStats = NtfServerStats { fromTime :: IORef UTCTime, @@ -21,8 +26,13 @@ data NtfServerStats = NtfServerStats subCreated :: IORef Int, subDeleted :: IORef Int, ntfReceived :: IORef Int, + ntfReceivedAuth :: IORef Int, ntfDelivered :: IORef Int, ntfFailed :: IORef Int, + ntfReceivedOwn :: StatsByServer, + ntfReceivedAuthOwn :: StatsByServer, + ntfDeliveredOwn :: StatsByServer, + ntfFailedOwn :: StatsByServer, ntfCronDelivered :: IORef Int, ntfCronFailed :: IORef Int, ntfVrfQueued :: IORef Int, @@ -42,8 +52,13 @@ data NtfServerStatsData = NtfServerStatsData _subCreated :: Int, _subDeleted :: Int, _ntfReceived :: Int, + _ntfReceivedAuth :: Int, _ntfDelivered :: Int, _ntfFailed :: Int, + _ntfReceivedOwn :: StatsByServerData, + _ntfReceivedAuthOwn :: StatsByServerData, + _ntfDeliveredOwn :: StatsByServerData, + _ntfFailedOwn :: StatsByServerData, _ntfCronDelivered :: Int, _ntfCronFailed :: Int, _ntfVrfQueued :: Int, @@ -64,8 +79,13 @@ newNtfServerStats ts = do subCreated <- newIORef 0 subDeleted <- newIORef 0 ntfReceived <- newIORef 0 + ntfReceivedAuth <- newIORef 0 ntfDelivered <- newIORef 0 ntfFailed <- newIORef 0 + ntfReceivedOwn <- TM.emptyIO + ntfReceivedAuthOwn <- TM.emptyIO + ntfDeliveredOwn <- TM.emptyIO + ntfFailedOwn <- TM.emptyIO ntfCronDelivered <- newIORef 0 ntfCronFailed <- newIORef 0 ntfVrfQueued <- newIORef 0 @@ -84,8 +104,13 @@ newNtfServerStats ts = do subCreated, subDeleted, ntfReceived, + ntfReceivedAuth, ntfDelivered, ntfFailed, + ntfReceivedOwn, + ntfReceivedAuthOwn, + ntfDeliveredOwn, + ntfFailedOwn, ntfCronDelivered, ntfCronFailed, ntfVrfQueued, @@ -106,8 +131,13 @@ getNtfServerStatsData s@NtfServerStats {fromTime} = do _subCreated <- readIORef $ subCreated s _subDeleted <- readIORef $ subDeleted s _ntfReceived <- readIORef $ ntfReceived s + _ntfReceivedAuth <- readIORef $ ntfReceivedAuth s _ntfDelivered <- readIORef $ ntfDelivered s _ntfFailed <- readIORef $ ntfFailed s + _ntfReceivedOwn <- getStatsByServer $ ntfReceivedOwn s + _ntfReceivedAuthOwn <- getStatsByServer $ ntfReceivedAuthOwn s + _ntfDeliveredOwn <- getStatsByServer $ ntfDeliveredOwn s + _ntfFailedOwn <- getStatsByServer $ ntfFailedOwn s _ntfCronDelivered <- readIORef $ ntfCronDelivered s _ntfCronFailed <- readIORef $ ntfCronFailed s _ntfVrfQueued <- readIORef $ ntfVrfQueued s @@ -126,8 +156,13 @@ getNtfServerStatsData s@NtfServerStats {fromTime} = do _subCreated, _subDeleted, _ntfReceived, + _ntfReceivedAuth, _ntfDelivered, _ntfFailed, + _ntfReceivedOwn, + _ntfReceivedAuthOwn, + _ntfDeliveredOwn, + _ntfFailedOwn, _ntfCronDelivered, _ntfCronFailed, _ntfVrfQueued, @@ -149,8 +184,13 @@ setNtfServerStats s@NtfServerStats {fromTime} d@NtfServerStatsData {_fromTime} = writeIORef (subCreated s) $! _subCreated d writeIORef (subDeleted s) $! _subDeleted d writeIORef (ntfReceived s) $! _ntfReceived d + writeIORef (ntfReceivedAuth s) $! _ntfReceivedAuth d writeIORef (ntfDelivered s) $! _ntfDelivered d writeIORef (ntfFailed s) $! _ntfFailed d + setStatsByServer (ntfReceivedOwn s) $! _ntfReceivedOwn d + setStatsByServer (ntfReceivedAuthOwn s) $! _ntfReceivedAuthOwn d + setStatsByServer (ntfDeliveredOwn s) $! _ntfDeliveredOwn d + setStatsByServer (ntfFailedOwn s) $! _ntfFailedOwn d writeIORef (ntfCronDelivered s) $! _ntfCronDelivered d writeIORef (ntfCronFailed s) $! _ntfCronFailed d writeIORef (ntfVrfQueued s) $! _ntfVrfQueued d @@ -171,8 +211,13 @@ instance StrEncoding NtfServerStatsData where _subCreated, _subDeleted, _ntfReceived, + _ntfReceivedAuth, _ntfDelivered, _ntfFailed, + _ntfReceivedOwn, + _ntfReceivedAuthOwn, + _ntfDeliveredOwn, + _ntfFailedOwn, _ntfCronDelivered, _ntfCronFailed, _ntfVrfQueued, @@ -191,8 +236,13 @@ instance StrEncoding NtfServerStatsData where "subCreated=" <> strEncode _subCreated, "subDeleted=" <> strEncode _subDeleted, "ntfReceived=" <> strEncode _ntfReceived, + "ntfReceivedAuth=" <> strEncode _ntfReceivedAuth, "ntfDelivered=" <> strEncode _ntfDelivered, "ntfFailed=" <> strEncode _ntfFailed, + "ntfReceivedOwn=" <> strEncode _ntfReceivedOwn, + "ntfReceivedAuthOwn=" <> strEncode _ntfReceivedAuthOwn, + "ntfDeliveredOwn=" <> strEncode _ntfDeliveredOwn, + "ntfFailedOwn=" <> strEncode _ntfFailedOwn, "ntfCronDelivered=" <> strEncode _ntfCronDelivered, "ntfCronFailed=" <> strEncode _ntfCronFailed, "ntfVrfQueued=" <> strEncode _ntfVrfQueued, @@ -213,8 +263,13 @@ instance StrEncoding NtfServerStatsData where _subCreated <- "subCreated=" *> strP <* A.endOfLine _subDeleted <- "subDeleted=" *> strP <* A.endOfLine _ntfReceived <- "ntfReceived=" *> strP <* A.endOfLine + _ntfReceivedAuth <- opt "ntfReceivedAuth=" _ntfDelivered <- "ntfDelivered=" *> strP <* A.endOfLine _ntfFailed <- opt "ntfFailed=" + _ntfReceivedOwn <- statByServerP "ntfReceivedOwn=" + _ntfReceivedAuthOwn <- statByServerP "ntfReceivedAuthOwn=" + _ntfDeliveredOwn <- statByServerP "ntfDeliveredOwn=" + _ntfFailedOwn <- statByServerP "ntfFailedOwn=" _ntfCronDelivered <- opt "ntfCronDelivered=" _ntfCronFailed <- opt "ntfCronFailed=" _ntfVrfQueued <- opt "ntfVrfQueued=" @@ -235,8 +290,13 @@ instance StrEncoding NtfServerStatsData where _subCreated, _subDeleted, _ntfReceived, + _ntfReceivedAuth, _ntfDelivered, _ntfFailed, + _ntfReceivedOwn, + _ntfReceivedAuthOwn, + _ntfDeliveredOwn, + _ntfFailedOwn, _ntfCronDelivered, _ntfCronFailed, _ntfVrfQueued, @@ -248,3 +308,26 @@ instance StrEncoding NtfServerStatsData where } where opt s = A.string s *> strP <* A.endOfLine <|> pure 0 + statByServerP s = A.string s *> strP <* A.endOfLine <|> pure (StatsByServerData []) + +type StatsByServer = TMap Text (TVar Int) + +newtype StatsByServerData = StatsByServerData [(Text, Int)] + +instance StrEncoding StatsByServerData where + strEncode (StatsByServerData d) = strEncodeList d + strP = StatsByServerData <$> serverP `A.sepBy'` A.char ',' + where + serverP = (,) <$> strP_ <*> A.decimal + +getStatsByServer :: TMap Text (TVar Int) -> IO StatsByServerData +getStatsByServer s = readTVarIO s >>= fmap (StatsByServerData . M.toList) . mapM readTVarIO + +setStatsByServer :: TMap Text (TVar Int) -> StatsByServerData -> IO () +setStatsByServer s (StatsByServerData d) = mapM newTVarIO (M.fromList d) >>= atomically . writeTVar s + +-- double lookup avoids STM transaction with a shared map in most cases +incServerStat :: Text -> TMap Text (TVar Int) -> IO () +incServerStat h s = TM.lookupIO h s >>= atomically . maybe newServerStat (`modifyTVar'` (+ 1)) + where + newServerStat = TM.lookup h s >>= maybe (TM.insertM h (newTVar 1) s) (`modifyTVar'` (+ 1))