diff --git a/simplexmq.cabal b/simplexmq.cabal index 43eb6b31c..e3e1e61e7 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -252,6 +252,7 @@ library Simplex.Messaging.Notifications.Server.Control Simplex.Messaging.Notifications.Server.Env Simplex.Messaging.Notifications.Server.Main + Simplex.Messaging.Notifications.Server.Prometheus Simplex.Messaging.Notifications.Server.Push.APNS Simplex.Messaging.Notifications.Server.Push.APNS.Internal Simplex.Messaging.Notifications.Server.Stats diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index a67d03b9d..55ab37885 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -12,9 +12,11 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} +{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} module Simplex.Messaging.Notifications.Server where +import Control.Concurrent (threadDelay) import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -27,13 +29,15 @@ import Data.Functor (($>)) import Data.IORef import Data.Int (Int64) import qualified Data.IntSet as IS -import Data.List (intercalate, partition, sort) +import Data.List (foldl', intercalate) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (mapMaybe) import qualified Data.Set as S +import Data.Text (Text) import qualified Data.Text as T +import qualified Data.Text.IO as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (getSystemTime) @@ -48,6 +52,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Control import Simplex.Messaging.Notifications.Server.Env +import Simplex.Messaging.Notifications.Server.Prometheus import Simplex.Messaging.Notifications.Server.Push.APNS (PushNotification (..), PushProviderError (..)) import Simplex.Messaging.Notifications.Server.Stats import Simplex.Messaging.Notifications.Server.Store (NtfSTMStore, TokenNtfMessageRecord (..), stmStoreTokenLastNtf) @@ -60,13 +65,14 @@ import Simplex.Messaging.Server import Simplex.Messaging.Server.Control (CPClientRole (..)) import Simplex.Messaging.Server.Env.STM (StartOptions (..)) import Simplex.Messaging.Server.QueueStore (getSystemDate) -import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, updatePeriodStats) +import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, periodStatDataCounts, updatePeriodStats) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams) import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.Server (AddHTTP, runTransportServer, runLocalTCPServer) import Simplex.Messaging.Util +import System.Environment (lookupEnv) import System.Exit (exitFailure, exitSuccess) import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) @@ -99,7 +105,15 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} stopServer liftIO $ exitSuccess resubscribe s - raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports <> serverStatsThread_ cfg <> controlPortThread_ cfg) `finally` stopServer + raceAny_ + ( ntfSubscriber s + : ntfPush ps + : map runServer transports + <> serverStatsThread_ cfg + <> prometheusMetricsThread_ cfg + <> controlPortThread_ cfg + ) + `finally` stopServer where runServer :: (ServiceName, ATransport, AddHTTP) -> M () runServer (tcpPort, ATransport t, _addHTTP) = do @@ -193,6 +207,90 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} ] liftIO $ threadDelay' interval + prometheusMetricsThread_ :: NtfServerConfig -> [M ()] + prometheusMetricsThread_ NtfServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} = + [savePrometheusMetrics interval prometheusMetricsFile] + prometheusMetricsThread_ _ = [] + + savePrometheusMetrics :: Int -> FilePath -> M () + savePrometheusMetrics saveInterval metricsFile = do + labelMyThread "savePrometheusMetrics" + liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile + st <- asks store + ss <- asks serverStats + env <- ask + rtsOpts <- liftIO $ maybe ("set " <> rtsOptionsEnv) T.pack <$> lookupEnv (T.unpack rtsOptionsEnv) + let interval = 1000000 * saveInterval + liftIO $ forever $ do + threadDelay interval + ts <- getCurrentTime + sm <- getNtfServerMetrics st ss rtsOpts + rtm <- getNtfRealTimeMetrics env + T.writeFile metricsFile $ ntfPrometheusMetrics sm rtm ts + + getNtfServerMetrics :: NtfPostgresStore -> NtfServerStats -> Text -> IO NtfServerMetrics + getNtfServerMetrics st ss rtsOptions = do + d <- getNtfServerStatsData ss + let psTkns = periodStatDataCounts $ _activeTokens d + psSubs = periodStatDataCounts $ _activeSubs d + (tokenCount, approxSubCount, lastNtfCount) <- getEntityCounts st + pure NtfServerMetrics {statsData = d, activeTokensCounts = psTkns, activeSubsCounts = psSubs, tokenCount, approxSubCount, lastNtfCount, rtsOptions} + + getNtfRealTimeMetrics :: NtfEnv -> IO NtfRealTimeMetrics + getNtfRealTimeMetrics NtfEnv {subscriber, pushServer} = do +#if MIN_VERSION_base(4,18,0) + threadsCount <- length <$> listThreads +#else + let threadsCount = 0 +#endif + let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber + NtfPushServer {pushQ} = pushServer + SMPClientAgent {smpClients, smpSessions, srvSubs, pendingSrvSubs, smpSubWorkers} = a + srvSubscribers <- getSMPWorkerMetrics a smpSubscribers + srvClients <- getSMPWorkerMetrics a smpClients + srvSubWorkers <- getSMPWorkerMetrics a smpSubWorkers + ntfActiveSubs <- getSMPSubMetrics a srvSubs + ntfPendingSubs <- getSMPSubMetrics a pendingSrvSubs + smpSessionCount <- M.size <$> readTVarIO smpSessions + apnsPushQLength <- fromIntegral <$> atomically (lengthTBQueue pushQ) + pure NtfRealTimeMetrics {threadsCount, srvSubscribers, srvClients, srvSubWorkers, ntfActiveSubs, ntfPendingSubs, smpSessionCount, apnsPushQLength} + where + getSMPSubMetrics :: SMPClientAgent -> TMap SMPServer (TMap SMPSub a) -> IO NtfSMPSubMetrics + getSMPSubMetrics a v = do + subs <- readTVarIO v + let metrics = NtfSMPSubMetrics {ownSrvSubs = M.empty, otherServers = 0, otherSrvSubCount = 0} + (metrics', otherSrvs) <- foldM countSubs (metrics, S.empty) $ M.assocs subs + pure (metrics' :: NtfSMPSubMetrics) {otherServers = S.size otherSrvs} + where + countSubs :: (NtfSMPSubMetrics, S.Set Text) -> (SMPServer, TMap SMPSub a) -> IO (NtfSMPSubMetrics, S.Set Text) + countSubs acc@(metrics, !otherSrvs) (srv@(SMPServer (h :| _) _ _), srvSubs) = + result . M.size <$> readTVarIO srvSubs + where + result 0 = acc + result cnt + | isOwnServer a srv = + let !ownSrvSubs' = M.alter (Just . maybe cnt (+ cnt)) host ownSrvSubs + metrics' = metrics {ownSrvSubs = ownSrvSubs'} :: NtfSMPSubMetrics + in (metrics', otherSrvs) + | otherwise = + let metrics' = metrics {otherSrvSubCount = otherSrvSubCount + cnt} :: NtfSMPSubMetrics + in (metrics', S.insert host otherSrvs) + NtfSMPSubMetrics {ownSrvSubs, otherSrvSubCount} = metrics + host = safeDecodeUtf8 $ strEncode h + + getSMPWorkerMetrics :: SMPClientAgent -> TMap SMPServer a -> IO NtfSMPWorkerMetrics + getSMPWorkerMetrics a v = workerMetrics a . M.keys <$> readTVarIO v + workerMetrics :: SMPClientAgent -> [SMPServer] -> NtfSMPWorkerMetrics + workerMetrics a srvs = NtfSMPWorkerMetrics {ownServers = reverse ownSrvs, otherServers} + where + (ownSrvs, otherServers) = foldl' countSrv ([], 0) srvs + countSrv (!own, !other) srv@(SMPServer (h :| _) _ _) + | isOwnServer a srv = (host : own, other) + | otherwise = (own, other + 1) + where + host = safeDecodeUtf8 $ strEncode h + + controlPortThread_ :: NtfServerConfig -> [M ()] controlPortThread_ NtfServerConfig {controlPort = Just port} = [runCPServer port] controlPortThread_ _ = [] @@ -266,59 +364,38 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} logError "Unauthorized control port command" hPutStrLn h "AUTH" r -> do + NtfRealTimeMetrics {threadsCount, srvSubscribers, srvClients, srvSubWorkers, ntfActiveSubs, ntfPendingSubs, smpSessionCount, apnsPushQLength} <- + getNtfRealTimeMetrics =<< unliftIO u ask #if MIN_VERSION_base(4,18,0) - threads <- liftIO listThreads - hPutStrLn h $ "Threads: " <> show (length threads) + hPutStrLn h $ "Threads: " <> show threadsCount #else hPutStrLn h "Threads: not available on GHC 8.10" #endif - NtfEnv {subscriber, pushServer} <- unliftIO u ask - let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber - NtfPushServer {pushQ} = pushServer - SMPClientAgent {smpClients, smpSessions, srvSubs, pendingSrvSubs, smpSubWorkers} = a - putSMPWorkers a "SMP subcscribers" smpSubscribers - putSMPWorkers a "SMP clients" smpClients - putSMPWorkers a "SMP subscription workers" smpSubWorkers - sessions <- readTVarIO smpSessions - hPutStrLn h $ "SMP sessions count: " <> show (M.size sessions) - putSMPSubs a "SMP subscriptions" srvSubs - putSMPSubs a "Pending SMP subscriptions" pendingSrvSubs - sz <- atomically $ lengthTBQueue pushQ - hPutStrLn h $ "Push notifications queue length: " <> show sz + putSMPWorkers "SMP subcscribers" srvSubscribers + putSMPWorkers "SMP clients" srvClients + putSMPWorkers "SMP subscription workers" srvSubWorkers + hPutStrLn h $ "SMP sessions count: " <> show smpSessionCount + putSMPSubs "SMP subscriptions" ntfActiveSubs + putSMPSubs "Pending SMP subscriptions" ntfPendingSubs + hPutStrLn h $ "Push notifications queue length: " <> show apnsPushQLength where - putSMPSubs :: SMPClientAgent -> String -> TMap SMPServer (TMap SMPSub a) -> IO () - putSMPSubs a name v = do - subs <- readTVarIO v - (totalCnt, ownCount, otherCnt, servers, ownByServer) <- foldM countSubs (0, 0, 0, [], M.empty) $ M.assocs subs - showServers a name servers - hPutStrLn h $ name <> " total: " <> show totalCnt - hPutStrLn h $ name <> " on own servers: " <> show ownCount - when (r == CPRAdmin && not (null ownByServer)) $ - forM_ (M.assocs ownByServer) $ \(SMPServer (host :| _) _ _, cnt) -> - hPutStrLn h $ name <> " on " <> B.unpack (strEncode host) <> ": " <> show cnt - hPutStrLn h $ name <> " on other servers: " <> show otherCnt - where - countSubs :: (Int, Int, Int, [SMPServer], M.Map SMPServer Int) -> (SMPServer, TMap SMPSub a) -> IO (Int, Int, Int, [SMPServer], M.Map SMPServer Int) - countSubs (!totalCnt, !ownCount, !otherCnt, !servers, !ownByServer) (srv, srvSubs) = do - cnt <- M.size <$> readTVarIO srvSubs - let totalCnt' = totalCnt + cnt - ownServer = isOwnServer a srv - (ownCount', otherCnt') - | ownServer = (ownCount + cnt, otherCnt) - | otherwise = (ownCount, otherCnt + cnt) - servers' = if cnt > 0 then srv : servers else servers - ownByServer' - | r == CPRAdmin && ownServer && cnt > 0 = M.alter (Just . maybe cnt (+ cnt)) srv ownByServer - | otherwise = ownByServer - pure (totalCnt', ownCount', otherCnt', servers', ownByServer') - putSMPWorkers :: SMPClientAgent -> String -> TMap SMPServer a -> IO () - putSMPWorkers a name v = readTVarIO v >>= showServers a name . M.keys - showServers :: SMPClientAgent -> String -> [SMPServer] -> IO () - showServers a name srvs = do - let (ownSrvs, otherSrvs) = partition (isOwnServer a) srvs - hPutStrLn h $ name <> " own servers count: " <> show (length ownSrvs) - when (r == CPRAdmin) $ hPutStrLn h $ name <> " own servers: " <> intercalate "," (sort $ map (\(SMPServer (host :| _) _ _) -> B.unpack $ strEncode host) ownSrvs) - hPutStrLn h $ name <> " other servers count: " <> show (length otherSrvs) + putSMPSubs :: Text -> NtfSMPSubMetrics -> IO () + putSMPSubs name NtfSMPSubMetrics {ownSrvSubs, otherServers, otherSrvSubCount} = do + showServers name (M.keys ownSrvSubs) otherServers + let ownSrvSubCount = M.foldl' (+) 0 ownSrvSubs + T.hPutStrLn h $ name <> " total: " <> tshow (ownSrvSubCount + otherSrvSubCount) + T.hPutStrLn h $ name <> " on own servers: " <> tshow ownSrvSubCount + when (r == CPRAdmin && not (M.null ownSrvSubs)) $ + forM_ (M.assocs ownSrvSubs) $ \(host, cnt) -> + T.hPutStrLn h $ name <> " on " <> host <> ": " <> tshow cnt + T.hPutStrLn h $ name <> " on other servers: " <> tshow otherSrvSubCount + putSMPWorkers :: Text -> NtfSMPWorkerMetrics -> IO () + putSMPWorkers name NtfSMPWorkerMetrics {ownServers, otherServers} = showServers name ownServers otherServers + showServers :: Text -> [Text] -> Int -> IO () + showServers name ownServers otherServers = do + T.hPutStrLn h $ name <> " own servers count: " <> tshow (length ownServers) + when (r == CPRAdmin) $ T.hPutStrLn h $ name <> " own servers: " <> T.intercalate "," ownServers + T.hPutStrLn h $ name <> " other servers count: " <> tshow otherServers CPHelp -> hPutStrLn h "commands: stats, stats-rts, server-info, help, quit" CPQuit -> pure () CPSkip -> pure () diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 46f3e9f2d..6488cbb11 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -59,6 +59,9 @@ data NtfServerConfig = NtfServerConfig logStatsStartTime :: Int64, serverStatsLogFile :: FilePath, serverStatsBackupFile :: Maybe FilePath, + -- | interval and file to save prometheus metrics + prometheusInterval :: Maybe Int, + prometheusMetricsFile :: FilePath, ntfServerVRange :: VersionRangeNTF, transportConfig :: TransportServerConfig, startOptions :: StartOptions diff --git a/src/Simplex/Messaging/Notifications/Server/Main.hs b/src/Simplex/Messaging/Notifications/Server/Main.hs index aa0e036ba..be3210be1 100644 --- a/src/Simplex/Messaging/Notifications/Server/Main.hs +++ b/src/Simplex/Messaging/Notifications/Server/Main.hs @@ -44,7 +44,7 @@ import Simplex.Messaging.Server.StoreLog (closeStoreLog) import Simplex.Messaging.Transport (ATransport, simplexMQVersion) import Simplex.Messaging.Transport.Client (TransportHost (..)) import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials (..), TransportServerConfig (..), defaultTransportServerConfig) -import Simplex.Messaging.Util (ifM, tshow) +import Simplex.Messaging.Util (eitherToMaybe, ifM, tshow) import System.Directory (createDirectoryIfMissing, doesFileExist, renameFile) import System.Exit (exitFailure) import System.FilePath (combine) @@ -267,6 +267,8 @@ ntfServerCLI cfgPath logPath = logStatsStartTime = 0, -- seconds from 00:00 UTC serverStatsLogFile = combine logPath "ntf-server-stats.daily.log", serverStatsBackupFile = logStats $> combine logPath "ntf-server-stats.log", + prometheusInterval = eitherToMaybe $ read . T.unpack <$> lookupValue "STORE_LOG" "prometheus_interval" ini, + prometheusMetricsFile = combine logPath "ntf-server-metrics.txt", ntfServerVRange = supportedServerNTFVRange, transportConfig = defaultTransportServerConfig diff --git a/src/Simplex/Messaging/Notifications/Server/Prometheus.hs b/src/Simplex/Messaging/Notifications/Server/Prometheus.hs new file mode 100644 index 000000000..a3399c27f --- /dev/null +++ b/src/Simplex/Messaging/Notifications/Server/Prometheus.hs @@ -0,0 +1,251 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} +{-# OPTIONS_GHC -fno-warn-unrecognised-pragmas #-} + +module Simplex.Messaging.Notifications.Server.Prometheus where + +import Data.Int (Int64) +import qualified Data.Map.Strict as M +import Data.Text (Text) +import qualified Data.Text as T +import Data.Time.Clock (UTCTime (..), diffUTCTime) +import Data.Time.Clock.System (systemEpochDay) +import Data.Time.Format.ISO8601 (iso8601Show) +import Simplex.Messaging.Notifications.Server.Stats +import Simplex.Messaging.Server.Stats (PeriodStatCounts (..)) +import Simplex.Messaging.Transport (simplexMQVersion) + +data NtfServerMetrics = NtfServerMetrics + { statsData :: NtfServerStatsData, + activeTokensCounts :: PeriodStatCounts, + activeSubsCounts :: PeriodStatCounts, + tokenCount :: Int64, + approxSubCount :: Int64, + lastNtfCount :: Int64, + rtsOptions :: Text + } + +rtsOptionsEnv :: Text +rtsOptionsEnv = "NTF_RTS_OPTIONS" + +data NtfRealTimeMetrics = NtfRealTimeMetrics + { threadsCount :: Int, + srvSubscribers :: NtfSMPWorkerMetrics, -- smpSubscribers + srvClients :: NtfSMPWorkerMetrics, -- smpClients + srvSubWorkers :: NtfSMPWorkerMetrics, -- smpSubWorkers + ntfActiveSubs :: NtfSMPSubMetrics, -- srvSubs + ntfPendingSubs :: NtfSMPSubMetrics, -- pendingSrvSubs + smpSessionCount :: Int, -- smpSessions + apnsPushQLength :: Int -- lengthTBQueue pushQ + } + +data NtfSMPWorkerMetrics = NtfSMPWorkerMetrics {ownServers :: [Text], otherServers :: Int} + +data NtfSMPSubMetrics = NtfSMPSubMetrics {ownSrvSubs :: M.Map Text Int, otherServers :: Int, otherSrvSubCount :: Int} + +{-# FOURMOLU_DISABLE\n#-} +ntfPrometheusMetrics :: NtfServerMetrics -> NtfRealTimeMetrics -> UTCTime -> Text +ntfPrometheusMetrics sm rtm ts = + time <> tokens <> subscriptions <> notifications <> info + where + NtfServerMetrics {statsData, activeTokensCounts = psTkns, activeSubsCounts = psSubs, tokenCount, approxSubCount, lastNtfCount, rtsOptions} = sm + NtfRealTimeMetrics + { threadsCount, + srvSubscribers, + srvClients, + srvSubWorkers, + ntfActiveSubs, + ntfPendingSubs, + smpSessionCount, + apnsPushQLength + } = rtm + NtfServerStatsData + { _fromTime, + _tknCreated, + _tknVerified, + _tknDeleted, + _tknReplaced, + _subCreated, + _subDeleted, + _ntfReceived, + _ntfDelivered, + _ntfFailed, + _ntfCronDelivered, + _ntfCronFailed, + _ntfVrfQueued, + _ntfVrfDelivered, + _ntfVrfFailed, + _ntfVrfInvalidTkn + } = statsData + time = + "# Recorded at: " <> T.pack (iso8601Show ts) <> "\n\ + \# Stats from: " <> T.pack (iso8601Show _fromTime) <> "\n\ + \\n" + tokens = + "# Tokens\n\ + \# ------\n\ + \\n\ + \# HELP simplex_ntf_tokens_created Created tokens\n\ + \# TYPE simplex_ntf_tokens_created counter\n\ + \simplex_ntf_tokens_created " <> mshow _tknCreated <> "\n# tknCreated\n\ + \\n\ + \# HELP simplex_ntf_tokens_verified Verified tokens\n\ + \# TYPE simplex_ntf_tokens_verified counter\n\ + \simplex_ntf_tokens_verified " <> mshow _tknVerified <> "\n# tknVerified\n\ + \\n\ + \# HELP simplex_ntf_tokens_deleted Deleted tokens\n\ + \# TYPE simplex_ntf_tokens_deleted counter\n\ + \simplex_ntf_tokens_deleted " <> mshow _tknDeleted <> "\n# tknDeleted\n\ + \\n\ + \# HELP simplex_ntf_tokens_replaced Deleted tokens\n\ + \# TYPE simplex_ntf_tokens_replaced counter\n\ + \simplex_ntf_tokens_replaced " <> mshow _tknReplaced <> "\n# tknReplaced\n\ + \\n\ + \# HELP simplex_ntf_tokens_count_daily Daily active tokens\n\ + \# TYPE simplex_ntf_tokens_count_daily gauge\n\ + \simplex_ntf_tokens_count_daily " <> mstr (dayCount psTkns) <> "\n# dayCountTkn\n\ + \\n\ + \# HELP simplex_ntf_tokens_count_weekly Weekly active tokens\n\ + \# TYPE simplex_ntf_tokens_count_weekly gauge\n\ + \simplex_ntf_tokens_count_weekly " <> mstr (weekCount psTkns) <> "\n# weekCountTkn\n\ + \\n\ + \# HELP simplex_ntf_tokens_count_monthly Monthly active tokens\n\ + \# TYPE simplex_ntf_tokens_count_monthly gauge\n\ + \simplex_ntf_tokens_count_monthly " <> mstr (monthCount psTkns) <> "\n# monthCountTkn\n\ + \\n\ + \# HELP simplex_ntf_tokens_total Total number of tokens stored.\n\ + \# TYPE simplex_ntf_tokens_total gauge\n\ + \simplex_ntf_tokens_total " <> mshow tokenCount <> "\n# tokenCount\n\ + \\n" + subscriptions = + "# Subscriptions\n\ + \# -------------\n\ + \\n\ + \# HELP simplex_ntf_subscriptions_created Created subscriptions\n\ + \# TYPE simplex_ntf_subscriptions_created counter\n\ + \simplex_ntf_subscriptions_created " <> mshow _subCreated <> "\n# subCreated\n\ + \\n\ + \# HELP simplex_ntf_subscriptions_deleted Deleted subscriptions\n\ + \# TYPE simplex_ntf_subscriptions_deleted counter\n\ + \simplex_ntf_subscriptions_deleted " <> mshow _subDeleted <> "\n# subDeleted\n\ + \\n\ + \# HELP simplex_ntf_subscriptions_count_daily Daily subscriptions count\n\ + \# TYPE simplex_ntf_subscriptions_count_daily gauge\n\ + \simplex_ntf_subscriptions_count_daily " <> mstr (dayCount psSubs) <> "\n# dayCountSub\n\ + \\n\ + \# HELP simplex_ntf_subscriptions_count_weekly Weekly subscriptions count\n\ + \# TYPE simplex_ntf_subscriptions_count_weekly gauge\n\ + \simplex_ntf_subscriptions_count_weekly " <> mstr (weekCount psSubs) <> "\n# weekCountSub\n\ + \\n\ + \# HELP simplex_ntf_subscriptions_count_monthly Monthly subscriptions count\n\ + \# TYPE simplex_ntf_subscriptions_count_monthly gauge\n\ + \simplex_ntf_subscriptions_count_monthly " <> mstr (monthCount psSubs) <> "\n# monthCountSub\n\ + \\n\ + \# HELP simplex_ntf_subscriptions_approx_total Approximate total number of subscriptions stored.\n\ + \# TYPE simplex_ntf_subscriptions_approx_total gauge\n\ + \simplex_ntf_subscriptions_approx_total " <> mshow approxSubCount <> "\n# approxSubCount\n\ + \\n" + <> showSubMetric ntfActiveSubs "simplex_ntf_smp_subscription_active_" "Active" + <> showSubMetric ntfPendingSubs "simplex_ntf_smp_subscription_pending_" "Pending" + notifications = + "# Notifications\n\ + \# -------------\n\ + \\n\ + \# HELP simplex_ntf_notifications_received Received notifications\n\ + \# TYPE simplex_ntf_notifications_received counter\n\ + \simplex_ntf_notifications_received " <> mshow _ntfReceived <> "\n# ntfReceived\n\ + \\n\ + \# HELP simplex_ntf_notifications_delivered Delivered notifications\n\ + \# TYPE simplex_ntf_notifications_delivered counter\n\ + \simplex_ntf_notifications_delivered " <> mshow _ntfDelivered <> "\n# ntfDelivered\n\ + \\n\ + \# HELP simplex_ntf_notifications_failed Failed notifications\n\ + \# TYPE simplex_ntf_notifications_failed counter\n\ + \simplex_ntf_notifications_failed " <> mshow _ntfFailed <> "\n# ntfFailed\n\ + \\n\ + \# HELP simplex_ntf_notifications_periodic_delivered Delivered periodic notifications\n\ + \# TYPE simplex_ntf_notifications_periodic_delivered counter\n\ + \simplex_ntf_notifications_periodic_delivered " <> mshow _ntfCronDelivered <> "\n# ntfCronDelivered\n\ + \\n\ + \# HELP simplex_ntf_notifications_periodic_failed Failed periodic notifications\n\ + \# TYPE simplex_ntf_notifications_periodic_failed counter\n\ + \simplex_ntf_notifications_periodic_failed " <> mshow _ntfCronFailed <> "\n# ntfCronFailed\n\ + \\n\ + \# HELP simplex_ntf_notifications_verification_queued Token verifications queued\n\ + \# TYPE simplex_ntf_notifications_verification_queued counter\n\ + \simplex_ntf_notifications_verification_queued " <> mshow _ntfVrfQueued <> "\n# ntfVrfQueued\n\ + \\n\ + \# HELP simplex_ntf_notifications_verification_delivered Delivered token verifications\n\ + \# TYPE simplex_ntf_notifications_verification_delivered counter\n\ + \simplex_ntf_notifications_verification_delivered " <> mshow _ntfVrfDelivered <> "\n# ntfVrfDelivered\n\ + \\n\ + \# HELP simplex_ntf_notifications_verification_failed Failed token verification deliveries\n\ + \# TYPE simplex_ntf_notifications_verification_failed counter\n\ + \simplex_ntf_notifications_verification_failed " <> mshow _ntfVrfFailed <> "\n# ntfVrfFailed\n\ + \\n\ + \# HELP simplex_ntf_notifications_verification_invalid_tkn Invalid token errors while delivering verifications\n\ + \# TYPE simplex_ntf_notifications_verification_invalid_tkn counter\n\ + \simplex_ntf_notifications_verification_invalid_tkn " <> mshow _ntfVrfInvalidTkn <> "\n# ntfVrfInvalidTkn\n\ + \\n\ + \# HELP simplex_ntf_notifications_total Total number of last notifications stored.\n\ + \# TYPE simplex_ntf_notifications_total gauge\n\ + \simplex_ntf_notifications_total " <> mshow lastNtfCount <> "\n# lastNtfCount\n\ + \\n" + info = + "# Info\n\ + \# ----\n\ + \\n\ + \# HELP simplex_ntf_info Server information. RTS options have to be passed via " <> rtsOptionsEnv <> " env var\n\ + \# TYPE simplex_ntf_info gauge\n\ + \simplex_ntf_info{version=\"" <> T.pack simplexMQVersion <> "\",rts_options=\"" <> rtsOptions <> "\"} 1\n\ + \\n\ + \# HELP simplex_ntf_threads_total Thread count\n\ + \# TYPE simplex_ntf_threads_total gauge\n\ + \simplex_ntf_threads_total " <> mshow threadsCount <> "\n# threadsCount\n\ + \\n" + <> showWorkerMetric srvSubscribers "simplex_ntf_smp_subscribers_" "SMP subcscribers" + <> showWorkerMetric srvClients "simplex_ntf_smp_agent_clients_" "SMP agent clients" + <> showWorkerMetric srvSubWorkers "simplex_ntf_smp_agent_sub_workers_" "SMP agent subscription workers" + <> "# HELP simplex_ntf_smp_sessions_count SMP sessions count\n\ + \# TYPE simplex_ntf_smp_sessions_count gauge\n\ + \simplex_ntf_smp_sessions_count " <> mshow smpSessionCount <> "\n# smpSessionCount\n\ + \\n\ + \# HELP simplex_ntf_apns_queue_length Count of notifications in push queue\n\ + \# TYPE simplex_ntf_apns_queue_length gauge\n\ + \simplex_ntf_apns_queue_length " <> mshow apnsPushQLength <> "\n# apnsPushQLength\n\ + \\n" + showSubMetric NtfSMPSubMetrics {ownSrvSubs, otherServers, otherSrvSubCount} mPfx descrPfx = + showOwnSrvSubs <> showOtherSrvSubs + where + showOwnSrvSubs + | M.null ownSrvSubs = showOwn_ "" 0 0 + | otherwise = T.concat $ map (\(host, cnt) -> showOwn_ (metricHost host) 1 cnt) $ M.assocs ownSrvSubs + showOwn_ param srvCnt subCnt = + gaugeMetric (mPfx <> "server_count_own") param srvCnt (descrPfx <> " SMP subscriptions, own server count") "ownSrvSubs server" + <> gaugeMetric (mPfx <> "sub_count_own") param subCnt (descrPfx <> " SMP subscriptions count for own servers") "ownSrvSubs count" + showOtherSrvSubs = + gaugeMetric (mPfx <> "server_count_other") "" otherServers (descrPfx <> " SMP subscriptions, other server count") "otherServers" + <> gaugeMetric (mPfx <> "sub_count_other") "" otherSrvSubCount (descrPfx <> " SMP subscriptions count for other servers") "otherSrvSubCount" + showWorkerMetric NtfSMPWorkerMetrics {ownServers, otherServers} mPfx descrPfx = + showOwnServers <> showOtherServers + where + showOwnServers + | null ownServers = showOwn_ "" 0 + | otherwise = T.concat $ map (\host -> showOwn_ (metricHost host) 1) ownServers + showOwn_ param cnt = gaugeMetric (mPfx <> "count_own") param cnt (descrPfx <> " count for own servers") "ownServers" + showOtherServers = gaugeMetric (mPfx <> "count_other") "" otherServers (descrPfx <> " count for other servers") "otherServers" + gaugeMetric :: Text -> Text -> Int -> Text -> Text -> Text + gaugeMetric name param value descr codeRef = + "# HELP " <> name <> " " <> descr <> "\n\ + \# TYPE " <> name <> " gauge\n\ + \" <> name <> param <> " " <> mshow value <> "\n# " <> codeRef <> "\n\ + \\n" + metricHost host = "{server=\"" <> host <> "\"}" + mstr a = T.pack a <> " " <> tsEpoch + mshow :: Show a => a -> Text + mshow = mstr . show + tsEpoch = T.pack $ show @Int64 $ floor @Double $ realToFrac (ts `diffUTCTime` epoch) * 1000 + epoch = UTCTime systemEpochDay 0 +{-# FOURMOLU_ENABLE\n#-} diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs index 8a8c475ac..3aa1e7e31 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs @@ -585,6 +585,17 @@ toLastNtf :: SMPQueueNtfRow :. (SystemTime, C.CbNonce, Binary EncNMsgMeta) -> PN toLastNtf (qRow :. (ts, nonce, Binary encMeta)) = PNMessageData {smpQueue = rowToSMPQueue qRow, ntfTs = ts, nmsgNonce = nonce, encNMsgMeta = encMeta} +getEntityCounts :: NtfPostgresStore -> IO (Int64, Int64, Int64) +getEntityCounts st = + fmap (fromRight (0, 0, 0)) $ withDB' "getEntityCounts" st $ \db -> do + tCnt <- count <$> DB.query_ db "SELECT count(1) FROM tokens" + sCnt <- count <$> DB.query_ db "SELECT reltuples::BIGINT FROM pg_class WHERE relname = 'subscriptions' AND relkind = 'r'" + nCnt <- count <$> DB.query_ db "SELECT count(1) FROM last_notifications" + pure (tCnt, sCnt, nCnt) + where + count (Only n : _) = n + count [] = 0 + importNtfSTMStore :: NtfPostgresStore -> NtfSTMStore -> IO (Int64, Int64, Int64) importNtfSTMStore NtfPostgresStore {dbStore = s} stmStore = do (tIds, tCnt) <- importTokens diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 32534ccf9..2f3dea57f 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -114,6 +114,7 @@ import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util import Simplex.Messaging.Version +import System.Environment (lookupEnv) import System.Exit (exitFailure, exitSuccess) import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) @@ -562,21 +563,22 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt AMS _ _ st <- asks msgStore ss <- asks serverStats env <- ask + rtsOpts <- liftIO $ maybe ("set " <> rtsOptionsEnv) T.pack <$> lookupEnv (T.unpack rtsOptionsEnv) let interval = 1000000 * saveInterval liftIO $ forever $ do threadDelay interval ts <- getCurrentTime - sm <- getServerMetrics st ss + sm <- getServerMetrics st ss rtsOpts rtm <- getRealTimeMetrics env T.writeFile metricsFile $ prometheusMetrics sm rtm ts - getServerMetrics :: forall s. MsgStoreClass s => s -> ServerStats -> IO ServerMetrics - getServerMetrics st ss = do + getServerMetrics :: forall s. MsgStoreClass s => s -> ServerStats -> Text -> IO ServerMetrics + getServerMetrics st ss rtsOptions = do d <- getServerStatsData ss let ps = periodStatDataCounts $ _activeQueues d psNtf = periodStatDataCounts $ _activeQueuesNtf d QueueCounts {queueCount, notifierCount} <- queueCounts @(StoreQueue s) $ queueStore st - pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} + pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions} getRealTimeMetrics :: Env -> IO RealTimeMetrics getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index a542a87f1..39dbc854f 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -14,6 +14,7 @@ import Data.Time.Format.ISO8601 (iso8601Show) import Network.Socket (ServiceName) import Simplex.Messaging.Server.MsgStore.Types (LoadedQueueCounts (..)) import Simplex.Messaging.Server.Stats +import Simplex.Messaging.Transport (simplexMQVersion) import Simplex.Messaging.Transport.Server (SocketStats (..)) data ServerMetrics = ServerMetrics @@ -21,9 +22,13 @@ data ServerMetrics = ServerMetrics activeQueueCounts :: PeriodStatCounts, activeNtfCounts :: PeriodStatCounts, queueCount :: Int, - notifierCount :: Int + notifierCount :: Int, + rtsOptions :: Text } +rtsOptionsEnv :: Text +rtsOptionsEnv = "SMP_RTS_OPTIONS" + data RealTimeMetrics = RealTimeMetrics { socketStats :: [(ServiceName, SocketStats)], threadsCount :: Int, @@ -40,7 +45,7 @@ prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text prometheusMetrics sm rtm ts = time <> queues <> subscriptions <> messages <> ntfMessages <> ntfs <> relays <> info where - ServerMetrics {statsData, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} = sm + ServerMetrics {statsData, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount, rtsOptions} = sm RealTimeMetrics { socketStats, threadsCount, @@ -87,10 +92,8 @@ prometheusMetrics sm rtm ts = _msgGetDuplicate, _msgGetProhibited, _msgExpired, - _activeQueues, _msgSentNtf, _msgRecvNtf, - _activeQueuesNtf, _msgNtfs, _msgNtfsB, _msgNtfNoSub, @@ -347,6 +350,10 @@ prometheusMetrics sm rtm ts = info = "# Info\n\ \# ----\n\ + \\n\ + \# HELP simplex_smp_info Server information. RTS options have to be passed via " <> rtsOptionsEnv <> " env var\n\ + \# TYPE simplex_smp_info gauge\n\ + \simplex_smp_info{version=\"" <> T.pack simplexMQVersion <> "\",rts_options=\"" <> rtsOptions <> "\"} 1\n\ \\n" <> socketsMetric socketsAccepted "simplex_smp_sockets_accepted" "Accepted sockets" <> socketsMetric socketsClosed "simplex_smp_sockets_closed" "Closed sockets" diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index ce69e5c11..a5a3e4069 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -570,7 +570,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag threadDelay 500000 suspendAgent alice 0 closeDBStore store - threadDelay 500000 >> callCommand "sync" >> threadDelay 500000 + threadDelay 1000000 >> callCommand "sync" >> threadDelay 1000000 putStrLn "before opening the database from another agent" -- aliceNtf client doesn't have subscription and is allowed to get notification message @@ -578,7 +578,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag (Just SMPMsgMeta {msgFlags = MsgFlags True}) :| _ <- getConnectionMessages aliceNtf [cId] pure () - threadDelay 500000 >> callCommand "sync" >> threadDelay 500000 + threadDelay 1000000 >> callCommand "sync" >> threadDelay 1000000 putStrLn "after closing the database in another agent" reopenDBStore store foregroundAgent alice diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index e7a7c2ba5..22bb3abaa 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -80,6 +80,9 @@ ntfTestStoreLogFile2 = "tests/tmp/ntf-server-store.log.2" ntfTestStoreLastNtfsFile :: FilePath ntfTestStoreLastNtfsFile = "tests/tmp/ntf-server-last-notifications.log" +ntfTestPrometheusMetricsFile :: FilePath +ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt" + ntfTestStoreDBOpts :: DBOpts ntfTestStoreDBOpts = DBOpts @@ -154,6 +157,8 @@ ntfServerCfg = logStatsStartTime = 0, serverStatsLogFile = "tests/ntf-server-stats.daily.log", serverStatsBackupFile = Nothing, + prometheusInterval = Nothing, + prometheusMetricsFile = ntfTestPrometheusMetricsFile, ntfServerVRange = supportedServerNTFVRange, transportConfig = defaultTransportServerConfig, startOptions = StartOptions {maintenance = False, compactLog = False, skipWarnings = False, confirmMigrations = MCYesUp}