mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 16:39:58 +00:00
ntf server: additional statistics (#1558)
* ntf server: additional statistics * version * fix stats * add stats to track notifications without active token * refactor * fix stats parser * version
This commit is contained in:
@@ -74,7 +74,6 @@ import Simplex.Messaging.Protocol
|
||||
subscriberParty,
|
||||
subscriberServiceRole
|
||||
)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Session
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
|
||||
@@ -43,7 +43,7 @@ import qualified Data.X509 as X
|
||||
import qualified Data.X509.Validation as XV
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Util (bshow, (<$?>))
|
||||
import Simplex.Messaging.Util (bshow, safeDecodeUtf8, (<$?>))
|
||||
|
||||
class TextEncoding a where
|
||||
textEncode :: a -> Text
|
||||
@@ -91,6 +91,10 @@ instance StrEncoding String where
|
||||
strEncode = strEncode . B.pack
|
||||
strP = B.unpack <$> strP
|
||||
|
||||
instance StrEncoding Text where
|
||||
strEncode = encodeUtf8
|
||||
strP = safeDecodeUtf8 <$> A.takeTill (\c -> c == ' ' || c == '\n')
|
||||
|
||||
instance ToJSON Str where
|
||||
toJSON (Str s) = strToJSON s
|
||||
toEncoding (Str s) = strToJEncoding s
|
||||
|
||||
@@ -281,7 +281,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
|
||||
getSMPServiceSubMetrics a sel subQueueCount = getSubMetrics_ a sel countSubs
|
||||
where
|
||||
countSubs :: (NtfSMPSubMetrics, S.Set Text) -> (SMPServer, TVar (Maybe sub)) -> IO (NtfSMPSubMetrics, S.Set Text)
|
||||
countSubs acc (srv, serviceSubs) = maybe acc (subMetricsResult a acc srv . fromIntegral . subQueueCount) <$> readTVarIO serviceSubs
|
||||
countSubs acc (srv, serviceSubs) = subMetricsResult a acc srv . fromIntegral . maybe 0 subQueueCount <$> readTVarIO serviceSubs
|
||||
|
||||
getSMPSubMetrics :: SMPClientAgent 'Notifier -> (SMPClientAgent 'Notifier -> TMap SMPServer (TMap NotifierId a)) -> IO NtfSMPSubMetrics
|
||||
getSMPSubMetrics a sel = getSubMetrics_ a sel countSubs
|
||||
@@ -305,11 +305,11 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
|
||||
| isOwnServer a srv =
|
||||
let !ownSrvSubs' = M.alter (Just . maybe cnt (+ cnt)) host ownSrvSubs
|
||||
metrics' = metrics {ownSrvSubs = ownSrvSubs'} :: NtfSMPSubMetrics
|
||||
in (metrics', otherSrvs)
|
||||
in (metrics', otherSrvs)
|
||||
| cnt == 0 = acc
|
||||
| otherwise =
|
||||
let metrics' = metrics {otherSrvSubCount = otherSrvSubCount + cnt} :: NtfSMPSubMetrics
|
||||
in (metrics', S.insert host otherSrvs)
|
||||
in (metrics', S.insert host otherSrvs)
|
||||
where
|
||||
NtfSMPSubMetrics {ownSrvSubs, otherSrvSubCount} = metrics
|
||||
host = safeDecodeUtf8 $ strEncode h
|
||||
@@ -527,7 +527,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
|
||||
NtfPushServer {pushQ} <- asks pushServer
|
||||
stats <- asks serverStats
|
||||
liftIO $ forever $ do
|
||||
((_, srv, _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
|
||||
((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ
|
||||
forM ts $ \(ntfId, t) -> case t of
|
||||
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
|
||||
STResponse {} -> pure () -- it was already reported as timeout error
|
||||
@@ -538,9 +538,16 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
|
||||
ntfTs <- getSystemTime
|
||||
updatePeriodStats (activeSubs stats) ntfId
|
||||
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
|
||||
ntfs_ <- addTokenLastNtf st newNtf
|
||||
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
|
||||
incNtfStat_ stats ntfReceived
|
||||
srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing
|
||||
addTokenLastNtf st newNtf >>= \case
|
||||
Right (tkn, lastNtfs) -> do
|
||||
atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs)
|
||||
incNtfStat_ stats ntfReceived
|
||||
mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_
|
||||
Left AUTH -> do
|
||||
incNtfStat_ stats ntfReceivedAuth
|
||||
mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_
|
||||
Left _ -> pure ()
|
||||
Right SMP.END ->
|
||||
whenM (atomically $ activeClientSession' ca sessionId srv) $
|
||||
void $ updateSrvSubStatus st smpQueue NSEnd
|
||||
@@ -625,7 +632,7 @@ showServer' = decodeLatin1 . strEncode . host
|
||||
|
||||
ntfPush :: NtfPushServer -> M ()
|
||||
ntfPush s@NtfPushServer {pushQ} = forever $ do
|
||||
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
|
||||
(srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
|
||||
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
|
||||
st <- asks store
|
||||
case ntf of
|
||||
@@ -644,8 +651,14 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
|
||||
PNMessage {} -> checkActiveTkn tknStatus $ do
|
||||
stats <- asks serverStats
|
||||
liftIO $ updatePeriodStats (activeTokens stats) ntfTknId
|
||||
liftIO (deliverNotification st pp tkn ntf)
|
||||
>>= incNtfStatT t . (\case Left _ -> ntfFailed; Right () -> ntfDelivered)
|
||||
liftIO (deliverNotification st pp tkn ntf) >>= \case
|
||||
Left _ -> do
|
||||
incNtfStatT t ntfFailed
|
||||
liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_
|
||||
Right () -> do
|
||||
incNtfStatT t ntfDelivered
|
||||
liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_
|
||||
|
||||
where
|
||||
checkActiveTkn :: NtfTknStatus -> M () -> M ()
|
||||
checkActiveTkn status action
|
||||
@@ -686,7 +699,7 @@ periodicNtfsThread NtfPushServer {pushQ} = do
|
||||
liftIO $ forever $ do
|
||||
threadDelay interval
|
||||
now <- systemSeconds <$> getSystemTime
|
||||
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
|
||||
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages)
|
||||
logNote $ "Scheduled periodic notifications: " <> tshow cnt
|
||||
|
||||
runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M ()
|
||||
@@ -794,7 +807,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
|
||||
ts <- liftIO $ getSystemDate
|
||||
let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts
|
||||
withNtfStore (`addNtfToken` tkn) $ \_ -> do
|
||||
atomically $ writeTBQueue pushQ (tkn, PNVerification regCode)
|
||||
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode)
|
||||
incNtfStatT token ntfVrfQueued
|
||||
incNtfStatT token tknCreated
|
||||
pure $ NRTknId tknId srvDhPubKey
|
||||
@@ -810,7 +823,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
|
||||
| otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification
|
||||
where
|
||||
sendVerification = do
|
||||
atomically $ writeTBQueue pushQ (tkn, PNVerification tknRegCode)
|
||||
atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode)
|
||||
incNtfStatT token ntfVrfQueued
|
||||
pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey
|
||||
TVFY code -- this allows repeated verification for cases when client connection dropped before server response
|
||||
@@ -828,7 +841,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ
|
||||
regCode <- getRegCode
|
||||
let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode}
|
||||
withNtfStore (`replaceNtfToken` tkn') $ \_ -> do
|
||||
atomically $ writeTBQueue pushQ (tkn', PNVerification regCode)
|
||||
atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode)
|
||||
incNtfStatT token ntfVrfQueued
|
||||
incNtfStatT token tknReplaced
|
||||
pure NROk
|
||||
|
||||
@@ -146,7 +146,7 @@ data SMPSubscriber = SMPSubscriber
|
||||
}
|
||||
|
||||
data NtfPushServer = NtfPushServer
|
||||
{ pushQ :: TBQueue (NtfTknRec, PushNotification),
|
||||
{ pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server
|
||||
pushClients :: TMap PushProvider PushProviderClient,
|
||||
apnsConfig :: APNSPushClientConfig
|
||||
}
|
||||
|
||||
@@ -76,8 +76,13 @@ ntfPrometheusMetrics sm rtm ts =
|
||||
_subCreated,
|
||||
_subDeleted,
|
||||
_ntfReceived,
|
||||
_ntfReceivedAuth,
|
||||
_ntfDelivered,
|
||||
_ntfFailed,
|
||||
_ntfReceivedOwn,
|
||||
_ntfReceivedAuthOwn,
|
||||
_ntfDeliveredOwn,
|
||||
_ntfFailedOwn,
|
||||
_ntfCronDelivered,
|
||||
_ntfCronFailed,
|
||||
_ntfVrfQueued,
|
||||
@@ -165,6 +170,10 @@ ntfPrometheusMetrics sm rtm ts =
|
||||
\# TYPE simplex_ntf_notifications_received counter\n\
|
||||
\simplex_ntf_notifications_received " <> mshow _ntfReceived <> "\n# ntfReceived\n\
|
||||
\\n\
|
||||
\# HELP simplex_ntf_notifications_received_auth Received notifications without token or subscription (AUTH error)\n\
|
||||
\# TYPE simplex_ntf_notifications_received_auth counter\n\
|
||||
\simplex_ntf_notifications_received_auth " <> mshow _ntfReceivedAuth <> "\n# ntfReceivedAuth\n\
|
||||
\\n\
|
||||
\# HELP simplex_ntf_notifications_delivered Delivered notifications\n\
|
||||
\# TYPE simplex_ntf_notifications_delivered counter\n\
|
||||
\simplex_ntf_notifications_delivered " <> mshow _ntfDelivered <> "\n# ntfDelivered\n\
|
||||
@@ -201,6 +210,10 @@ ntfPrometheusMetrics sm rtm ts =
|
||||
\# TYPE simplex_ntf_notifications_total gauge\n\
|
||||
\simplex_ntf_notifications_total " <> mshow lastNtfCount <> "\n# lastNtfCount\n\
|
||||
\\n"
|
||||
<> showNtfsByServer _ntfReceivedOwn "simplex_ntf_notifications_received_own" "Received notifications" "ntfReceivedOwn"
|
||||
<> showNtfsByServer _ntfReceivedAuthOwn "simplex_ntf_notifications_received_auth_own" "Received notifications without token or subscription (AUTH error)" "ntfReceivedAuthOwn"
|
||||
<> showNtfsByServer _ntfDeliveredOwn "simplex_ntf_notifications_delivered_own" "Delivered notifications" "ntfDeliveredOwn"
|
||||
<> showNtfsByServer _ntfFailedOwn "simplex_ntf_notifications_failed_own" "Failed notifications" "ntfFailedOwn"
|
||||
info =
|
||||
"# Info\n\
|
||||
\# ----\n\
|
||||
@@ -228,28 +241,41 @@ ntfPrometheusMetrics sm rtm ts =
|
||||
showOwnSrvSubs <> showOtherSrvSubs
|
||||
where
|
||||
showOwnSrvSubs
|
||||
| M.null ownSrvSubs = showOwn_ "" 0 0
|
||||
| otherwise = T.concat $ map (\(host, cnt) -> showOwn_ (metricHost host) 1 cnt) $ M.assocs ownSrvSubs
|
||||
showOwn_ param srvCnt subCnt =
|
||||
gaugeMetric (mPfx <> "server_count_own") param srvCnt (descrPfx <> " SMP subscriptions, own server count") "ownSrvSubs server"
|
||||
<> gaugeMetric (mPfx <> "sub_count_own") param subCnt (descrPfx <> " SMP subscriptions count for own servers") "ownSrvSubs count"
|
||||
| M.null ownSrvSubs = ""
|
||||
| otherwise =
|
||||
gaugeMetrics (mPfx <> "server_count_own") srvMetrics (descrPfx <> " SMP subscriptions, own server count") "ownSrvSubs server"
|
||||
<> gaugeMetrics (mPfx <> "sub_count_own") subMetrics (descrPfx <> " SMP subscriptions count for own servers") "ownSrvSubs count"
|
||||
where
|
||||
subs = M.assocs ownSrvSubs
|
||||
srvMetrics = map (\(host, _) -> (metricHost host, 1)) subs
|
||||
subMetrics = map (\(host, cnt) -> (metricHost host, cnt)) subs
|
||||
showOtherSrvSubs =
|
||||
gaugeMetric (mPfx <> "server_count_other") "" otherServers (descrPfx <> " SMP subscriptions, other server count") "otherServers"
|
||||
<> gaugeMetric (mPfx <> "sub_count_other") "" otherSrvSubCount (descrPfx <> " SMP subscriptions count for other servers") "otherSrvSubCount"
|
||||
gaugeMetrics (mPfx <> "server_count_other") [("", otherServers)] (descrPfx <> " SMP subscriptions, other server count") "otherServers"
|
||||
<> gaugeMetrics (mPfx <> "sub_count_other") [("", otherSrvSubCount)] (descrPfx <> " SMP subscriptions count for other servers") "otherSrvSubCount"
|
||||
showNtfsByServer (StatsByServerData srvNtfs) mName descr varName
|
||||
| null srvNtfs = ""
|
||||
| otherwise =
|
||||
"# HELP " <> mName <> " " <> descr <> "\n\
|
||||
\# TYPE " <> mName <> " counter\n"
|
||||
<> showNtfMetrics
|
||||
<> "# " <> varName <> "\n\n"
|
||||
where
|
||||
showNtfMetrics = T.concat $ map (\(host, value) -> mName <> metricHost host <> " " <> mshow value <> "\n") srvNtfs
|
||||
showWorkerMetric NtfSMPWorkerMetrics {ownServers, otherServers} mPfx descrPfx =
|
||||
showOwnServers <> showOtherServers
|
||||
where
|
||||
showOwnServers
|
||||
| null ownServers = showOwn_ "" 0
|
||||
| otherwise = T.concat $ map (\host -> showOwn_ (metricHost host) 1) ownServers
|
||||
showOwn_ param cnt = gaugeMetric (mPfx <> "count_own") param cnt (descrPfx <> " count for own servers") "ownServers"
|
||||
showOtherServers = gaugeMetric (mPfx <> "count_other") "" otherServers (descrPfx <> " count for other servers") "otherServers"
|
||||
gaugeMetric :: Text -> Text -> Int -> Text -> Text -> Text
|
||||
gaugeMetric name param value descr codeRef =
|
||||
| null ownServers = ""
|
||||
| otherwise = gaugeMetrics (mPfx <> "count_own") subMetrics (descrPfx <> " count for own servers") "ownServers"
|
||||
where
|
||||
subMetrics = map (\host -> (metricHost host, 1)) ownServers
|
||||
showOtherServers = gaugeMetrics (mPfx <> "count_other") [("", otherServers)] (descrPfx <> " count for other servers") "otherServers"
|
||||
gaugeMetrics :: Text -> [(Text, Int)] -> Text -> Text -> Text
|
||||
gaugeMetrics name subMetrics descr codeRef =
|
||||
"# HELP " <> name <> " " <> descr <> "\n\
|
||||
\# TYPE " <> name <> " gauge\n\
|
||||
\" <> name <> param <> " " <> mshow value <> "\n# " <> codeRef <> "\n\
|
||||
\\n"
|
||||
\# TYPE " <> name <> " gauge\n"
|
||||
<> T.concat (map (\(param, value) -> name <> param <> " " <> mshow value <> "\n") subMetrics)
|
||||
<> "# " <> codeRef <> "\n\n"
|
||||
metricHost host = "{server=\"" <> host <> "\"}"
|
||||
mstr a = a <> " " <> tsEpoch
|
||||
mshow :: Show a => a -> Text
|
||||
|
||||
@@ -5,12 +5,17 @@
|
||||
module Simplex.Messaging.Notifications.Server.Stats where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Concurrent.STM
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.IORef
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Server.Stats
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
|
||||
data NtfServerStats = NtfServerStats
|
||||
{ fromTime :: IORef UTCTime,
|
||||
@@ -21,8 +26,13 @@ data NtfServerStats = NtfServerStats
|
||||
subCreated :: IORef Int,
|
||||
subDeleted :: IORef Int,
|
||||
ntfReceived :: IORef Int,
|
||||
ntfReceivedAuth :: IORef Int,
|
||||
ntfDelivered :: IORef Int,
|
||||
ntfFailed :: IORef Int,
|
||||
ntfReceivedOwn :: StatsByServer,
|
||||
ntfReceivedAuthOwn :: StatsByServer,
|
||||
ntfDeliveredOwn :: StatsByServer,
|
||||
ntfFailedOwn :: StatsByServer,
|
||||
ntfCronDelivered :: IORef Int,
|
||||
ntfCronFailed :: IORef Int,
|
||||
ntfVrfQueued :: IORef Int,
|
||||
@@ -42,8 +52,13 @@ data NtfServerStatsData = NtfServerStatsData
|
||||
_subCreated :: Int,
|
||||
_subDeleted :: Int,
|
||||
_ntfReceived :: Int,
|
||||
_ntfReceivedAuth :: Int,
|
||||
_ntfDelivered :: Int,
|
||||
_ntfFailed :: Int,
|
||||
_ntfReceivedOwn :: StatsByServerData,
|
||||
_ntfReceivedAuthOwn :: StatsByServerData,
|
||||
_ntfDeliveredOwn :: StatsByServerData,
|
||||
_ntfFailedOwn :: StatsByServerData,
|
||||
_ntfCronDelivered :: Int,
|
||||
_ntfCronFailed :: Int,
|
||||
_ntfVrfQueued :: Int,
|
||||
@@ -64,8 +79,13 @@ newNtfServerStats ts = do
|
||||
subCreated <- newIORef 0
|
||||
subDeleted <- newIORef 0
|
||||
ntfReceived <- newIORef 0
|
||||
ntfReceivedAuth <- newIORef 0
|
||||
ntfDelivered <- newIORef 0
|
||||
ntfFailed <- newIORef 0
|
||||
ntfReceivedOwn <- TM.emptyIO
|
||||
ntfReceivedAuthOwn <- TM.emptyIO
|
||||
ntfDeliveredOwn <- TM.emptyIO
|
||||
ntfFailedOwn <- TM.emptyIO
|
||||
ntfCronDelivered <- newIORef 0
|
||||
ntfCronFailed <- newIORef 0
|
||||
ntfVrfQueued <- newIORef 0
|
||||
@@ -84,8 +104,13 @@ newNtfServerStats ts = do
|
||||
subCreated,
|
||||
subDeleted,
|
||||
ntfReceived,
|
||||
ntfReceivedAuth,
|
||||
ntfDelivered,
|
||||
ntfFailed,
|
||||
ntfReceivedOwn,
|
||||
ntfReceivedAuthOwn,
|
||||
ntfDeliveredOwn,
|
||||
ntfFailedOwn,
|
||||
ntfCronDelivered,
|
||||
ntfCronFailed,
|
||||
ntfVrfQueued,
|
||||
@@ -106,8 +131,13 @@ getNtfServerStatsData s@NtfServerStats {fromTime} = do
|
||||
_subCreated <- readIORef $ subCreated s
|
||||
_subDeleted <- readIORef $ subDeleted s
|
||||
_ntfReceived <- readIORef $ ntfReceived s
|
||||
_ntfReceivedAuth <- readIORef $ ntfReceivedAuth s
|
||||
_ntfDelivered <- readIORef $ ntfDelivered s
|
||||
_ntfFailed <- readIORef $ ntfFailed s
|
||||
_ntfReceivedOwn <- getStatsByServer $ ntfReceivedOwn s
|
||||
_ntfReceivedAuthOwn <- getStatsByServer $ ntfReceivedAuthOwn s
|
||||
_ntfDeliveredOwn <- getStatsByServer $ ntfDeliveredOwn s
|
||||
_ntfFailedOwn <- getStatsByServer $ ntfFailedOwn s
|
||||
_ntfCronDelivered <- readIORef $ ntfCronDelivered s
|
||||
_ntfCronFailed <- readIORef $ ntfCronFailed s
|
||||
_ntfVrfQueued <- readIORef $ ntfVrfQueued s
|
||||
@@ -126,8 +156,13 @@ getNtfServerStatsData s@NtfServerStats {fromTime} = do
|
||||
_subCreated,
|
||||
_subDeleted,
|
||||
_ntfReceived,
|
||||
_ntfReceivedAuth,
|
||||
_ntfDelivered,
|
||||
_ntfFailed,
|
||||
_ntfReceivedOwn,
|
||||
_ntfReceivedAuthOwn,
|
||||
_ntfDeliveredOwn,
|
||||
_ntfFailedOwn,
|
||||
_ntfCronDelivered,
|
||||
_ntfCronFailed,
|
||||
_ntfVrfQueued,
|
||||
@@ -149,8 +184,13 @@ setNtfServerStats s@NtfServerStats {fromTime} d@NtfServerStatsData {_fromTime} =
|
||||
writeIORef (subCreated s) $! _subCreated d
|
||||
writeIORef (subDeleted s) $! _subDeleted d
|
||||
writeIORef (ntfReceived s) $! _ntfReceived d
|
||||
writeIORef (ntfReceivedAuth s) $! _ntfReceivedAuth d
|
||||
writeIORef (ntfDelivered s) $! _ntfDelivered d
|
||||
writeIORef (ntfFailed s) $! _ntfFailed d
|
||||
setStatsByServer (ntfReceivedOwn s) $! _ntfReceivedOwn d
|
||||
setStatsByServer (ntfReceivedAuthOwn s) $! _ntfReceivedAuthOwn d
|
||||
setStatsByServer (ntfDeliveredOwn s) $! _ntfDeliveredOwn d
|
||||
setStatsByServer (ntfFailedOwn s) $! _ntfFailedOwn d
|
||||
writeIORef (ntfCronDelivered s) $! _ntfCronDelivered d
|
||||
writeIORef (ntfCronFailed s) $! _ntfCronFailed d
|
||||
writeIORef (ntfVrfQueued s) $! _ntfVrfQueued d
|
||||
@@ -171,8 +211,13 @@ instance StrEncoding NtfServerStatsData where
|
||||
_subCreated,
|
||||
_subDeleted,
|
||||
_ntfReceived,
|
||||
_ntfReceivedAuth,
|
||||
_ntfDelivered,
|
||||
_ntfFailed,
|
||||
_ntfReceivedOwn,
|
||||
_ntfReceivedAuthOwn,
|
||||
_ntfDeliveredOwn,
|
||||
_ntfFailedOwn,
|
||||
_ntfCronDelivered,
|
||||
_ntfCronFailed,
|
||||
_ntfVrfQueued,
|
||||
@@ -191,8 +236,13 @@ instance StrEncoding NtfServerStatsData where
|
||||
"subCreated=" <> strEncode _subCreated,
|
||||
"subDeleted=" <> strEncode _subDeleted,
|
||||
"ntfReceived=" <> strEncode _ntfReceived,
|
||||
"ntfReceivedAuth=" <> strEncode _ntfReceivedAuth,
|
||||
"ntfDelivered=" <> strEncode _ntfDelivered,
|
||||
"ntfFailed=" <> strEncode _ntfFailed,
|
||||
"ntfReceivedOwn=" <> strEncode _ntfReceivedOwn,
|
||||
"ntfReceivedAuthOwn=" <> strEncode _ntfReceivedAuthOwn,
|
||||
"ntfDeliveredOwn=" <> strEncode _ntfDeliveredOwn,
|
||||
"ntfFailedOwn=" <> strEncode _ntfFailedOwn,
|
||||
"ntfCronDelivered=" <> strEncode _ntfCronDelivered,
|
||||
"ntfCronFailed=" <> strEncode _ntfCronFailed,
|
||||
"ntfVrfQueued=" <> strEncode _ntfVrfQueued,
|
||||
@@ -213,8 +263,13 @@ instance StrEncoding NtfServerStatsData where
|
||||
_subCreated <- "subCreated=" *> strP <* A.endOfLine
|
||||
_subDeleted <- "subDeleted=" *> strP <* A.endOfLine
|
||||
_ntfReceived <- "ntfReceived=" *> strP <* A.endOfLine
|
||||
_ntfReceivedAuth <- opt "ntfReceivedAuth="
|
||||
_ntfDelivered <- "ntfDelivered=" *> strP <* A.endOfLine
|
||||
_ntfFailed <- opt "ntfFailed="
|
||||
_ntfReceivedOwn <- statByServerP "ntfReceivedOwn="
|
||||
_ntfReceivedAuthOwn <- statByServerP "ntfReceivedAuthOwn="
|
||||
_ntfDeliveredOwn <- statByServerP "ntfDeliveredOwn="
|
||||
_ntfFailedOwn <- statByServerP "ntfFailedOwn="
|
||||
_ntfCronDelivered <- opt "ntfCronDelivered="
|
||||
_ntfCronFailed <- opt "ntfCronFailed="
|
||||
_ntfVrfQueued <- opt "ntfVrfQueued="
|
||||
@@ -235,8 +290,13 @@ instance StrEncoding NtfServerStatsData where
|
||||
_subCreated,
|
||||
_subDeleted,
|
||||
_ntfReceived,
|
||||
_ntfReceivedAuth,
|
||||
_ntfDelivered,
|
||||
_ntfFailed,
|
||||
_ntfReceivedOwn,
|
||||
_ntfReceivedAuthOwn,
|
||||
_ntfDeliveredOwn,
|
||||
_ntfFailedOwn,
|
||||
_ntfCronDelivered,
|
||||
_ntfCronFailed,
|
||||
_ntfVrfQueued,
|
||||
@@ -248,3 +308,26 @@ instance StrEncoding NtfServerStatsData where
|
||||
}
|
||||
where
|
||||
opt s = A.string s *> strP <* A.endOfLine <|> pure 0
|
||||
statByServerP s = A.string s *> strP <* A.endOfLine <|> pure (StatsByServerData [])
|
||||
|
||||
type StatsByServer = TMap Text (TVar Int)
|
||||
|
||||
newtype StatsByServerData = StatsByServerData [(Text, Int)]
|
||||
|
||||
instance StrEncoding StatsByServerData where
|
||||
strEncode (StatsByServerData d) = strEncodeList d
|
||||
strP = StatsByServerData <$> serverP `A.sepBy'` A.char ','
|
||||
where
|
||||
serverP = (,) <$> strP_ <*> A.decimal
|
||||
|
||||
getStatsByServer :: TMap Text (TVar Int) -> IO StatsByServerData
|
||||
getStatsByServer s = readTVarIO s >>= fmap (StatsByServerData . M.toList) . mapM readTVarIO
|
||||
|
||||
setStatsByServer :: TMap Text (TVar Int) -> StatsByServerData -> IO ()
|
||||
setStatsByServer s (StatsByServerData d) = mapM newTVarIO (M.fromList d) >>= atomically . writeTVar s
|
||||
|
||||
-- double lookup avoids STM transaction with a shared map in most cases
|
||||
incServerStat :: Text -> TMap Text (TVar Int) -> IO ()
|
||||
incServerStat h s = TM.lookupIO h s >>= atomically . maybe newServerStat (`modifyTVar'` (+ 1))
|
||||
where
|
||||
newServerStat = TM.lookup h s >>= maybe (TM.insertM h (newTVar 1) s) (`modifyTVar'` (+ 1))
|
||||
|
||||
Reference in New Issue
Block a user