From 7bdae793cb2c59edab4f8848fa65bd437eb747e8 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sat, 26 Aug 2023 16:02:18 +0100 Subject: [PATCH] server: add control port commands for clients and ghc threads (#836) * server: add control port commands for clients and ghc threads (#835) * Add stats-rts control query With supporting ghc-options that would provide the data. * Add CPSkip command Allows spamming empty lines a few times to clean up the view. * server: Add CP commands to enumerate clients and threads * style --------- Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> * use base64 encoding for session ID * fromMaybe * whitespace --------- Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> --- package.yaml | 5 ++ simplexmq.cabal | 10 +-- src/Simplex/Messaging/Server.hs | 78 +++++++++++++++++------ src/Simplex/Messaging/Server/Control.hs | 9 +++ src/Simplex/Messaging/Transport/Server.hs | 3 +- src/Simplex/Messaging/Util.hs | 6 ++ 6 files changed, 84 insertions(+), 27 deletions(-) diff --git a/package.yaml b/package.yaml index 3fe146567..bec6554b2 100644 --- a/package.yaml +++ b/package.yaml @@ -97,6 +97,7 @@ executables: - simplexmq ghc-options: - -threaded + - -rtsopts ntf-server: source-dirs: apps/ntf-server @@ -105,6 +106,7 @@ executables: - simplexmq ghc-options: - -threaded + - -rtsopts xftp-server: source-dirs: apps/xftp-server @@ -113,6 +115,7 @@ executables: - simplexmq ghc-options: - -threaded + - -rtsopts smp-agent: source-dirs: apps/smp-agent @@ -121,6 +124,7 @@ executables: - simplexmq ghc-options: - -threaded + - -rtsopts xftp: source-dirs: apps/xftp @@ -129,6 +133,7 @@ executables: - simplexmq ghc-options: - -threaded + - -rtsopts tests: simplexmq-test: diff --git a/simplexmq.cabal b/simplexmq.cabal index af5274306..c0ddc0a57 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -203,7 +203,7 @@ executable ntf-server Paths_simplexmq hs-source-dirs: apps/ntf-server - ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded + ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts build-depends: QuickCheck ==2.14.* , aeson ==2.2.* @@ -268,7 +268,7 @@ executable smp-agent Paths_simplexmq hs-source-dirs: apps/smp-agent - ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded + ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts build-depends: QuickCheck ==2.14.* , aeson ==2.2.* @@ -333,7 +333,7 @@ executable smp-server Paths_simplexmq hs-source-dirs: apps/smp-server - ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded + ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts build-depends: QuickCheck ==2.14.* , aeson ==2.2.* @@ -398,7 +398,7 @@ executable xftp Paths_simplexmq hs-source-dirs: apps/xftp - ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded + ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts build-depends: QuickCheck ==2.14.* , aeson ==2.2.* @@ -463,7 +463,7 @@ executable xftp-server Paths_simplexmq hs-source-dirs: apps/xftp-server - ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded + ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts build-depends: QuickCheck ==2.14.* , aeson ==2.2.* diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index b25b1d66b..b22b8b7fc 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -49,16 +49,19 @@ import qualified Data.ByteString.Char8 as B import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (intercalate) +import Data.List (intercalate, sort) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M -import Data.Maybe (isNothing) +import Data.Maybe (fromMaybe, isNothing) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) import Data.Type.Equality +import GHC.Conc (listThreads, threadStatus) +import GHC.Conc.Sync (threadLabel) +import GHC.Stats (getRTSStats) import GHC.TypeLits (KnownNat) import Network.Socket (ServiceName, Socket, socketToHandle) import Simplex.Messaging.Agent.Lock @@ -113,8 +116,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do restoreServerMessages restoreServerStats raceAny_ - ( serverThread s subscribedQ subscribers subscriptions cancelSub : - serverThread s ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) : + ( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub : + serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg ) `finally` withLock (savingLock s) "final" (saveServer False) @@ -130,12 +133,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do serverThread :: forall s. Server -> + String -> (Server -> TQueue (QueueId, Client)) -> (Server -> TMap QueueId Client) -> (Client -> TMap QueueId s) -> (s -> IO ()) -> M () - serverThread s subQ subs clientSubs unsub = forever $ do + serverThread s label subQ subs clientSubs unsub = forever $ do + labelMyThread label atomically updateSubscribers $>>= endPreviousSubscriptions >>= liftIO . mapM_ unsub @@ -152,6 +157,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do TM.lookupInsert qId clnt (subs s) $>>= clientToBeNotified endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s) endPreviousSubscriptions (qId, c) = do + labelMyThread $ label <> ".endPreviousSubscriptions" void . forkIO . atomically $ writeTBQueue (sndQ c) [(CorrId "", qId, END)] atomically $ TM.lookupDelete qId (clientSubs c) @@ -165,6 +171,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do ms <- asks msgStore quota <- asks $ msgQueueQuota . config let interval = checkInterval expCfg * 1000000 + labelMyThread "expireMessages" forever $ do liftIO $ threadDelay' interval old <- liftIO $ expireBeforeEpoch expCfg @@ -180,6 +187,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do logServerStats :: Int64 -> Int64 -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do + labelMyThread "logServerStats" initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) @@ -224,9 +232,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do threadDelay' interval runClient :: Transport c => TProxy c -> c -> M () - runClient _ h = do + runClient tp h = do kh <- asks serverIdentity smpVRange <- asks $ smpServerVRange . config + labelMyThread $ "smp handshake for " <> transportName tp liftIO (runExceptT $ smpServerHandshake h kh smpVRange) >>= \case Right th -> runClientTransport th Left _ -> pure () @@ -240,10 +249,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do srv <- asks server cpStarted <- newEmptyTMVarIO u <- askUnliftIO - liftIO $ runTCPServer cpStarted port $ runCPClient u srv + liftIO $ do + labelMyThread "control port server" + runTCPServer cpStarted port $ runCPClient u srv where runCPClient :: UnliftIO (ReaderT Env IO) -> Server -> Socket -> IO () runCPClient u srv sock = do + labelMyThread "control port client" h <- socketToHandle sock ReadWriteMode hSetBuffering h LineBuffering hSetNewlineMode h universalNewlineMode @@ -259,7 +271,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do processCP h = \case CPSuspend -> hPutStrLn h "suspend not implemented" CPResume -> hPutStrLn h "resume not implemented" - CPClients -> hPutStrLn h "clients not implemented" + CPClients -> do + Server {subscribers} <- unliftIO u $ asks server + clients <- readTVarIO subscribers + hPutStrLn h $ "Clients: " <> show (length clients) + forM_ (M.toList clients) $ \(cid, Client {sessionId, connected, activeAt, subscriptions}) -> do + hPutStrLn h . B.unpack $ "Client " <> encode cid <> " $" <> encode sessionId + readTVarIO connected >>= hPutStrLn h . (" connected: " <>) . show + readTVarIO activeAt >>= hPutStrLn h . (" activeAt: " <>) . B.unpack . strEncode + readTVarIO subscriptions >>= hPutStrLn h . (" subscriptions: " <>) . show . M.size CPStats -> do ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats putStat "fromTime" fromTime @@ -275,12 +295,21 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do where putStat :: Show a => String -> TVar a -> IO () putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v + CPStatsRTS -> getRTSStats >>= hPutStrLn h . show + CPThreads -> do + threads <- liftIO listThreads + hPutStrLn h $ "Threads: " <> show (length threads) + forM_ (sort threads) $ \tid -> do + label <- threadLabel tid + status <- threadStatus tid + hPutStrLn h $ show tid <> " (" <> show status <> ") " <> fromMaybe "" label CPSave -> withLock (savingLock srv) "control" $ do hPutStrLn h "saving server state..." unliftIO u $ saveServer True hPutStrLn h "server state saved!" - CPHelp -> hPutStrLn h "commands: stats, save, help, quit" + CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, threads, save, help, quit" CPQuit -> pure () + CPSkip -> pure () runClientTransport :: Transport c => THandle c -> M () runClientTransport th@THandle {thVersion, sessionId} = do @@ -289,6 +318,7 @@ runClientTransport th@THandle {thVersion, sessionId} = do c <- atomically $ newClient q thVersion sessionId ts s <- asks server expCfg <- asks $ inactiveClientExpiration . config + labelMyThread . B.unpack $ "client $" <> encode c.sessionId raceAny_ ([liftIO $ send th c, client c s, receive th c] <> disconnectThread_ c expCfg) `finally` clientDisconnected c where @@ -319,12 +349,14 @@ cancelSub sub = _ -> return () receive :: Transport c => THandle c -> Client -> M () -receive th Client {rcvQ, sndQ, activeAt} = forever $ do - ts <- L.toList <$> liftIO (tGet th) - atomically . writeTVar activeAt =<< liftIO getSystemTime - as <- partitionEithers <$> mapM cmdAction ts - write sndQ $ fst as - write rcvQ $ snd as +receive th Client {rcvQ, sndQ, activeAt, sessionId} = do + labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive" + forever $ do + ts <- L.toList <$> liftIO (tGet th) + atomically . writeTVar activeAt =<< liftIO getSystemTime + as <- partitionEithers <$> mapM cmdAction ts + write sndQ $ fst as + write rcvQ $ snd as where cmdAction :: SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) cmdAction (sig, signed, (corrId, queueId, cmdOrError)) = @@ -338,10 +370,12 @@ receive th Client {rcvQ, sndQ, activeAt} = forever $ do write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty send :: Transport c => THandle c -> Client -> IO () -send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do - ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ - void . liftIO . tPut h Nothing $ L.map ((Nothing,) . encodeTransmission v sessionId) ts - atomically . writeTVar activeAt =<< liftIO getSystemTime +send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = do + labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" + forever $ do + ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ + void . liftIO . tPut h Nothing $ L.map ((Nothing,) . encodeTransmission v sessionId) ts + atomically . writeTVar activeAt =<< liftIO getSystemTime where tOrder :: Transmission BrokerMsg -> Int tOrder (_, _, cmd) = case cmd of @@ -350,7 +384,8 @@ send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do _ -> 1 disconnectTransport :: Transport c => THandle c -> client -> (client -> TVar SystemTime) -> ExpirationConfig -> IO () -disconnectTransport THandle {connection} c activeAt expCfg = do +disconnectTransport THandle {connection, sessionId} c activeAt expCfg = do + labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disconnectTransport" let interval = checkInterval expCfg * 1000000 forever . liftIO $ do threadDelay' interval @@ -406,7 +441,8 @@ dummyKeyEd448 :: C.PublicKey 'C.Ed448 dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/XopbOSaq9qyLhrgJWKOLyNrQPNVvpMA" client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m () -client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} = +client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} = do + labelMyThread . B.unpack $ "client $" <> encode clnt.sessionId <> " commands" forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs index 184e94775..e6c2e04c4 100644 --- a/src/Simplex/Messaging/Server/Control.hs +++ b/src/Simplex/Messaging/Server/Control.hs @@ -11,9 +11,12 @@ data ControlProtocol | CPResume | CPClients | CPStats + | CPStatsRTS + | CPThreads | CPSave | CPHelp | CPQuit + | CPSkip instance StrEncoding ControlProtocol where strEncode = \case @@ -21,16 +24,22 @@ instance StrEncoding ControlProtocol where CPResume -> "resume" CPClients -> "clients" CPStats -> "stats" + CPStatsRTS -> "stats-rts" + CPThreads -> "threads" CPSave -> "save" CPHelp -> "help" CPQuit -> "quit" + CPSkip -> "" strP = A.takeTill (== ' ') >>= \case "suspend" -> pure CPSuspend "resume" -> pure CPResume "clients" -> pure CPClients "stats" -> pure CPStats + "stats-rts" -> pure CPStatsRTS + "threads" -> pure CPThreads "save" -> pure CPSave "help" -> pure CPHelp "quit" -> pure CPQuit + "" -> pure CPSkip _ -> fail "bad ControlProtocol command" diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index 9ce23731d..8876135b1 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -32,7 +32,7 @@ import qualified Network.TLS as T import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport -import Simplex.Messaging.Util (catchAll_, tshow) +import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow) import System.Exit (exitFailure) import System.Mem.Weak (Weak, deRefWeak) import UnliftIO.Concurrent @@ -63,6 +63,7 @@ runTransportServer :: forall c m. (Transport c, MonadUnliftIO m) => TMVar Bool - runTransportServer started port serverParams cfg server = do u <- askUnliftIO let tCfg = serverTransportConfig cfg + labelMyThread $ "transport server for " <> transportName (TProxy :: TProxy c) liftIO . runTCPServer started port $ \conn -> E.bracket (connectTLS Nothing tCfg serverParams conn >>= getServerConnection tCfg) diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index 9be4bec10..4e000eab5 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -9,6 +9,7 @@ import Control.Monad import Control.Monad.Except import Control.Monad.IO.Unlift import Data.Bifunctor (first) +import qualified Data.ByteString as BW import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Int (Int64) @@ -19,6 +20,8 @@ import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8With) import Data.Time (NominalDiffTime) +import GHC.Conc +import Numeric (showHex) import UnliftIO.Async import qualified UnliftIO.Exception as UE @@ -149,3 +152,6 @@ diffToMicroseconds diff = fromIntegral ((truncate $ diff * 1000000) :: Integer) diffToMilliseconds :: NominalDiffTime -> Int64 diffToMilliseconds diff = fromIntegral ((truncate $ diff * 1000) :: Integer) + +labelMyThread :: MonadIO m => String -> m () +labelMyThread label = liftIO $ myThreadId >>= (`labelThread` label)