From 492d2f86bc5fa23f33f0b5e743c9b44e1d19049b Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sat, 13 Jul 2024 22:34:10 +0100 Subject: [PATCH] smp server: additional control port commands to monitor server state (#1228) * smp server: additional control port commands to monitor server state * fix * space --- src/Simplex/Messaging/Server.hs | 46 ++++++++++++++++++++++--- src/Simplex/Messaging/Server/Control.hs | 4 +++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 014bcd366..d88b2349a 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -6,7 +6,6 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} -{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedLists #-} @@ -60,6 +59,7 @@ import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) +import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) @@ -377,7 +377,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do let age = systemSeconds now - systemSeconds createdAt subscriptions' <- bshow . M.size <$> readTVarIO subscriptions hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions'] - CPStats -> withAdminRole $ do + CPStats -> withUserRole $ do ss <- unliftIO u $ asks serverStats let putStat :: Show a => ByteString -> (ServerStats -> TVar a) -> IO () putStat label var = readTVarIO (var ss) >>= \v -> B.hPutStr h $ label <> ": " <> bshow v <> "\n" @@ -391,6 +391,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do putStat "qDeletedAll" qDeletedAll putStat "qDeletedNew" qDeletedNew putStat "qDeletedSecured" qDeletedSecured + readTVarIO (day $ activeQueues ss) >>= \v -> B.hPutStr h $ "dayMsgQueues" <> ": " <> bshow (S.size v) <> "\n" putStat "msgSent" msgSent putStat "msgRecv" msgRecv putStat "msgSentNtf" msgSentNtf @@ -414,7 +415,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do #else hPutStrLn h "Not available on GHC 8.10" #endif - CPSockets -> withAdminRole $ do + CPSockets -> withUserRole $ do (accepted', closed', active') <- unliftIO u $ asks sockets (accepted, closed, active) <- atomically $ (,,) <$> readTVar accepted' <*> readTVar closed' <*> readTVar active' hPutStrLn h "Sockets: " @@ -436,6 +437,43 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do #else hPutStrLn h "Not available on GHC 8.10" #endif + CPServerInfo -> readTVarIO role >>= \case + CPRNone -> do + logError "Unauthorized control port command" + hPutStrLn h "AUTH" + r -> do +#if MIN_VERSION_base(4,18,0) + threads <- liftIO listThreads + hPutStrLn h $ "Threads: " <> show (length threads) +#else + hPutStrLn h "Threads: not available on GHC 8.10" +#endif + Env {clients, server = Server {subscribers, notifiers}} <- unliftIO u ask + activeClients <- readTVarIO clients + hPutStrLn h $ "Clients: " <> show (IM.size activeClients) + when (r == CPRAdmin) $ do + (smpSubCnt, smpClCnt) <- countClientSubs subscriptions activeClients + (ntfSubCnt, ntfClCnt) <- countClientSubs ntfSubscriptions activeClients + hPutStrLn h $ "SMP subscriptions (via clients, slow): " <> show smpSubCnt + hPutStrLn h $ "SMP subscribed clients (via clients, slow): " <> show smpClCnt + hPutStrLn h $ "Ntf subscriptions (via clients, slow): " <> show ntfSubCnt + hPutStrLn h $ "Ntf subscribed clients (via clients, slow): " <> show ntfClCnt + activeSubs <- readTVarIO subscribers + activeNtfSubs <- readTVarIO notifiers + hPutStrLn h $ "SMP subscriptions: " <> show (M.size activeSubs) + hPutStrLn h $ "SMP subscribed clients: " <> show (countSubClients activeSubs) + hPutStrLn h $ "Ntf subscriptions: " <> show (M.size activeNtfSubs) + hPutStrLn h $ "Ntf subscribed clients: " <> show (countSubClients activeNtfSubs) + where + countClientSubs :: (Client -> TMap QueueId a) -> IM.IntMap Client -> IO (Int, Int) + countClientSubs subSel = foldM addSubs (0, 0) + where + addSubs :: (Int, Int) -> Client -> IO (Int, Int) + addSubs (subCnt, clCnt) cl = do + subs <- readTVarIO $ subSel cl + let cnt = M.size subs + pure (subCnt + cnt, clCnt + if cnt == 0 then 0 else 1) + countSubClients = S.size . M.foldr' (S.insert . clientId) S.empty CPDelete queueId' -> withUserRole $ unliftIO u $ do st <- asks queueStore ms <- asks msgStore @@ -455,7 +493,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h "saving server state..." unliftIO u $ saveServer True hPutStrLn h "server state saved!" - CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, delete, save, help, quit" + CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, server-info, delete, save, help, quit" CPQuit -> pure () CPSkip -> pure () where diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs index 9463fa777..b4c74e4ac 100644 --- a/src/Simplex/Messaging/Server/Control.hs +++ b/src/Simplex/Messaging/Server/Control.hs @@ -9,6 +9,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (BasicAuth) data CPClientRole = CPRNone | CPRUser | CPRAdmin + deriving (Eq) data ControlProtocol = CPAuth BasicAuth @@ -20,6 +21,7 @@ data ControlProtocol | CPThreads | CPSockets | CPSocketThreads + | CPServerInfo | CPDelete ByteString | CPSave | CPHelp @@ -37,6 +39,7 @@ instance StrEncoding ControlProtocol where CPThreads -> "threads" CPSockets -> "sockets" CPSocketThreads -> "socket-threads" + CPServerInfo -> "server-info" CPDelete bs -> "delete " <> strEncode bs CPSave -> "save" CPHelp -> "help" @@ -53,6 +56,7 @@ instance StrEncoding ControlProtocol where "threads" -> pure CPThreads "sockets" -> pure CPSockets "socket-threads" -> pure CPSocketThreads + "server-info" -> pure CPServerInfo "delete" -> CPDelete <$> (A.space *> strP) "save" -> pure CPSave "help" -> pure CPHelp