mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 09:54:29 +00:00
smp server: additional control port commands to monitor server state (#1228)
* smp server: additional control port commands to monitor server state * fix * space
This commit is contained in:
committed by
GitHub
parent
8dd54ced0e
commit
492d2f86bc
@@ -6,7 +6,6 @@
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE MultiWayIf #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
@@ -60,6 +59,7 @@ import Data.List.NonEmpty (NonEmpty (..))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
|
||||
@@ -377,7 +377,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
let age = systemSeconds now - systemSeconds createdAt
|
||||
subscriptions' <- bshow . M.size <$> readTVarIO subscriptions
|
||||
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
|
||||
CPStats -> withAdminRole $ do
|
||||
CPStats -> withUserRole $ do
|
||||
ss <- unliftIO u $ asks serverStats
|
||||
let putStat :: Show a => ByteString -> (ServerStats -> TVar a) -> IO ()
|
||||
putStat label var = readTVarIO (var ss) >>= \v -> B.hPutStr h $ label <> ": " <> bshow v <> "\n"
|
||||
@@ -391,6 +391,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
putStat "qDeletedAll" qDeletedAll
|
||||
putStat "qDeletedNew" qDeletedNew
|
||||
putStat "qDeletedSecured" qDeletedSecured
|
||||
readTVarIO (day $ activeQueues ss) >>= \v -> B.hPutStr h $ "dayMsgQueues" <> ": " <> bshow (S.size v) <> "\n"
|
||||
putStat "msgSent" msgSent
|
||||
putStat "msgRecv" msgRecv
|
||||
putStat "msgSentNtf" msgSentNtf
|
||||
@@ -414,7 +415,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
#else
|
||||
hPutStrLn h "Not available on GHC 8.10"
|
||||
#endif
|
||||
CPSockets -> withAdminRole $ do
|
||||
CPSockets -> withUserRole $ do
|
||||
(accepted', closed', active') <- unliftIO u $ asks sockets
|
||||
(accepted, closed, active) <- atomically $ (,,) <$> readTVar accepted' <*> readTVar closed' <*> readTVar active'
|
||||
hPutStrLn h "Sockets: "
|
||||
@@ -436,6 +437,43 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
#else
|
||||
hPutStrLn h "Not available on GHC 8.10"
|
||||
#endif
|
||||
CPServerInfo -> readTVarIO role >>= \case
|
||||
CPRNone -> do
|
||||
logError "Unauthorized control port command"
|
||||
hPutStrLn h "AUTH"
|
||||
r -> do
|
||||
#if MIN_VERSION_base(4,18,0)
|
||||
threads <- liftIO listThreads
|
||||
hPutStrLn h $ "Threads: " <> show (length threads)
|
||||
#else
|
||||
hPutStrLn h "Threads: not available on GHC 8.10"
|
||||
#endif
|
||||
Env {clients, server = Server {subscribers, notifiers}} <- unliftIO u ask
|
||||
activeClients <- readTVarIO clients
|
||||
hPutStrLn h $ "Clients: " <> show (IM.size activeClients)
|
||||
when (r == CPRAdmin) $ do
|
||||
(smpSubCnt, smpClCnt) <- countClientSubs subscriptions activeClients
|
||||
(ntfSubCnt, ntfClCnt) <- countClientSubs ntfSubscriptions activeClients
|
||||
hPutStrLn h $ "SMP subscriptions (via clients, slow): " <> show smpSubCnt
|
||||
hPutStrLn h $ "SMP subscribed clients (via clients, slow): " <> show smpClCnt
|
||||
hPutStrLn h $ "Ntf subscriptions (via clients, slow): " <> show ntfSubCnt
|
||||
hPutStrLn h $ "Ntf subscribed clients (via clients, slow): " <> show ntfClCnt
|
||||
activeSubs <- readTVarIO subscribers
|
||||
activeNtfSubs <- readTVarIO notifiers
|
||||
hPutStrLn h $ "SMP subscriptions: " <> show (M.size activeSubs)
|
||||
hPutStrLn h $ "SMP subscribed clients: " <> show (countSubClients activeSubs)
|
||||
hPutStrLn h $ "Ntf subscriptions: " <> show (M.size activeNtfSubs)
|
||||
hPutStrLn h $ "Ntf subscribed clients: " <> show (countSubClients activeNtfSubs)
|
||||
where
|
||||
countClientSubs :: (Client -> TMap QueueId a) -> IM.IntMap Client -> IO (Int, Int)
|
||||
countClientSubs subSel = foldM addSubs (0, 0)
|
||||
where
|
||||
addSubs :: (Int, Int) -> Client -> IO (Int, Int)
|
||||
addSubs (subCnt, clCnt) cl = do
|
||||
subs <- readTVarIO $ subSel cl
|
||||
let cnt = M.size subs
|
||||
pure (subCnt + cnt, clCnt + if cnt == 0 then 0 else 1)
|
||||
countSubClients = S.size . M.foldr' (S.insert . clientId) S.empty
|
||||
CPDelete queueId' -> withUserRole $ unliftIO u $ do
|
||||
st <- asks queueStore
|
||||
ms <- asks msgStore
|
||||
@@ -455,7 +493,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
hPutStrLn h "saving server state..."
|
||||
unliftIO u $ saveServer True
|
||||
hPutStrLn h "server state saved!"
|
||||
CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, delete, save, help, quit"
|
||||
CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, sockets, socket-threads, threads, server-info, delete, save, help, quit"
|
||||
CPQuit -> pure ()
|
||||
CPSkip -> pure ()
|
||||
where
|
||||
|
||||
@@ -9,6 +9,7 @@ import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (BasicAuth)
|
||||
|
||||
data CPClientRole = CPRNone | CPRUser | CPRAdmin
|
||||
deriving (Eq)
|
||||
|
||||
data ControlProtocol
|
||||
= CPAuth BasicAuth
|
||||
@@ -20,6 +21,7 @@ data ControlProtocol
|
||||
| CPThreads
|
||||
| CPSockets
|
||||
| CPSocketThreads
|
||||
| CPServerInfo
|
||||
| CPDelete ByteString
|
||||
| CPSave
|
||||
| CPHelp
|
||||
@@ -37,6 +39,7 @@ instance StrEncoding ControlProtocol where
|
||||
CPThreads -> "threads"
|
||||
CPSockets -> "sockets"
|
||||
CPSocketThreads -> "socket-threads"
|
||||
CPServerInfo -> "server-info"
|
||||
CPDelete bs -> "delete " <> strEncode bs
|
||||
CPSave -> "save"
|
||||
CPHelp -> "help"
|
||||
@@ -53,6 +56,7 @@ instance StrEncoding ControlProtocol where
|
||||
"threads" -> pure CPThreads
|
||||
"sockets" -> pure CPSockets
|
||||
"socket-threads" -> pure CPSocketThreads
|
||||
"server-info" -> pure CPServerInfo
|
||||
"delete" -> CPDelete <$> (A.space *> strP)
|
||||
"save" -> pure CPSave
|
||||
"help" -> pure CPHelp
|
||||
|
||||
Reference in New Issue
Block a user